Code cleanup and typings

This commit is contained in:
Christoph Stahl 2022-11-27 14:53:25 +01:00
parent b234b7ea81
commit 13d13908cc
11 changed files with 334 additions and 299 deletions

View file

@ -31,3 +31,13 @@ build-backend = "poetry.core.masonry.api"
exclude = [ ".venv" ] exclude = [ ".venv" ]
venvPath = "." venvPath = "."
venv = ".venv" venv = ".venv"
[[tool.mypy.overrides]]
module = [
"pytube",
"minio",
"aiocmd",
"pyqrcode",
"socketio"
]
ignore_missing_imports = true

View file

@ -5,6 +5,8 @@ from traceback import print_exc
from json import load from json import load
import logging import logging
from argparse import ArgumentParser from argparse import ArgumentParser
from dataclasses import dataclass, field
from typing import Optional, Any
import socketio import socketio
import pyqrcode import pyqrcode
@ -13,25 +15,41 @@ from .sources import Source, configure_sources
from .entry import Entry from .entry import Entry
sio = socketio.AsyncClient() sio: socketio.AsyncClient = socketio.AsyncClient()
logger = logging.getLogger(__name__) logger: logging.Logger = logging.getLogger(__name__)
sources: dict[str, Source] = {} sources: dict[str, Source] = {}
currentLock = asyncio.Semaphore(0) currentLock: asyncio.Semaphore = asyncio.Semaphore(0)
state = {"current": None, "queue": [], "recent": [], "room": None, "server": "", "secret": ""}
@dataclass
class State:
current_source: Optional[Source] = None
queue: list[Entry] = field(default_factory=list)
recent: list[Entry] = field(default_factory=list)
room: str = ""
server: str = ""
secret: str = ""
state: State = State()
@sio.on("skip") @sio.on("skip")
async def handle_skip(): async def handle_skip():
logger.info("Skipping current") logger.info("Skipping current")
await state["current"].skip_current(state["queue"][0]) await state.current_source.skip_current(state.queue[0])
@sio.on("state") @sio.on("state")
async def handle_state(data): async def handle_state(data: dict[str, Any]):
state["queue"] = [Entry(**entry) for entry in data["queue"]] state.queue = [Entry(**entry) for entry in data["queue"]]
state["recent"] = [Entry(**entry) for entry in data["recent"]] state.recent = [Entry(**entry) for entry in data["recent"]]
for entry in state.queue[:2]:
logger.warning(f"Buffering: %s", entry.title)
await sources[entry.source].buffer(entry)
@sio.on("connect") @sio.on("connect")
@ -40,35 +58,29 @@ async def handle_connect():
await sio.emit( await sio.emit(
"register-client", "register-client",
{ {
"secret": state["secret"], "secret": state.secret,
"queue": [entry.to_dict() for entry in state["queue"]], "queue": [entry.to_dict() for entry in state.queue],
"recent": [entry.to_dict() for entry in state["recent"]], "recent": [entry.to_dict() for entry in state.recent],
"room": state["room"], "room": state.room,
}, },
) )
@sio.on("buffer") @sio.on("buffer")
async def handle_buffer(data): async def handle_buffer(data: dict[str, Any]):
source = sources[data["source"]] source: Source = sources[data["source"]]
meta_info = await source.buffer(Entry(**data)) meta_info: dict[str, Any] = await source.get_missing_metadata(Entry(**data))
await sio.emit("meta-info", {"uuid": data["uuid"], "meta": meta_info}) await sio.emit("meta-info", {"uuid": data["uuid"], "meta": meta_info})
async def buffer_and_report(entry):
meta_info = await sources[entry.source].buffer(entry)
await sio.emit("meta-info", {"uuid": entry.uuid, "meta": meta_info})
@sio.on("play") @sio.on("play")
async def handle_play(data): async def handle_play(data: dict[str, Any]):
entry = Entry(**data) entry: Entry = Entry(**data)
print( print(
f"Playing: {entry.artist} - {entry.title} [{entry.album}] ({entry.source}) for {entry.performer}" f"Playing: {entry.artist} - {entry.title} [{entry.album}] ({entry.source}) for {entry.performer}"
) )
try: try:
state["current"] = sources[entry.source] state.current_source = sources[entry.source]
asyncio.create_task(buffer_and_report(entry))
await sources[entry.source].play(entry) await sources[entry.source].play(entry)
except Exception: except Exception:
print_exc() print_exc()
@ -77,16 +89,14 @@ async def handle_play(data):
@sio.on("client-registered") @sio.on("client-registered")
async def handle_register(data): async def handle_register(data: dict[str, Any]):
if data["success"]: if data["success"]:
logging.info("Registered") logging.info("Registered")
print(f"Join here: {state['server']}/{data['room']}") print(f"Join here: {state.server}/{data['room']}")
print( print(pyqrcode.create(f"{state.server}/{data['room']}").terminal(quiet_zone=1))
pyqrcode.create(f"{state['server']}/{data['room']}").terminal(quiet_zone=1) state.room = data["room"]
)
state["room"] = data["room"]
await sio.emit("sources", {"sources": list(sources.keys())}) await sio.emit("sources", {"sources": list(sources.keys())})
if state["current"] is None: if state.current_source is None:
await sio.emit("get-first") await sio.emit("get-first")
else: else:
logging.warning("Registration failed") logging.warning("Registration failed")
@ -94,11 +104,13 @@ async def handle_register(data):
@sio.on("request-config") @sio.on("request-config")
async def handle_request_config(data): async def handle_request_config(data: dict[str, Any]):
if data["source"] in sources: if data["source"] in sources:
config = await sources[data["source"]].get_config() config: dict[str, Any] | list[dict[str, Any]] = await sources[
data["source"]
].get_config()
if isinstance(config, list): if isinstance(config, list):
num_chunks = len(config) num_chunks: int = len(config)
for current, chunk in enumerate(config): for current, chunk in enumerate(config):
await sio.emit( await sio.emit(
"config-chunk", "config-chunk",
@ -114,7 +126,7 @@ async def handle_request_config(data):
async def aiomain(): async def aiomain():
parser = ArgumentParser() parser: ArgumentParser = ArgumentParser()
parser.add_argument("--room", "-r") parser.add_argument("--room", "-r")
parser.add_argument("--secret", "-s") parser.add_argument("--secret", "-s")
@ -127,15 +139,17 @@ async def aiomain():
source_config = load(file) source_config = load(file)
sources.update(configure_sources(source_config)) sources.update(configure_sources(source_config))
if args.room: if args.room:
state["room"] = args.room state.room = args.room
if args.secret: if args.secret:
state["secret"] = args.secret state.secret = args.secret
else: else:
state["secret"] = ''.join(secrets.choice(string.ascii_letters + string.digits) for _ in range(8)) state.secret = "".join(
print(f"Generated secret: {state['secret']}") secrets.choice(string.ascii_letters + string.digits) for _ in range(8)
)
print(f"Generated secret: {state.secret}")
state["server"] = args.server state.server = args.server
await sio.connect(args.server) await sio.connect(args.server)
await sio.wait() await sio.wait()

View file

@ -1,6 +1,10 @@
from __future__ import annotations from __future__ import annotations
from dataclasses import dataclass, field from dataclasses import dataclass, field
from uuid import uuid4, UUID from uuid import uuid4, UUID
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from .sources import Source
@dataclass @dataclass

View file

@ -1,5 +1,6 @@
from __future__ import annotations from __future__ import annotations
from dataclasses import dataclass from dataclasses import dataclass
from typing import Optional
import os.path import os.path
@ -21,7 +22,7 @@ class Result:
} }
@staticmethod @staticmethod
def from_filename(filename, source) -> Result | None: def from_filename(filename, source) -> Optional[Result]:
try: try:
splitfile = os.path.basename(filename[:-4]).split(" - ") splitfile = os.path.basename(filename[:-4]).split(" - ")
ident = filename ident = filename

View file

@ -33,9 +33,6 @@ logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
clients = {}
class Queue: class Queue:
def __init__(self, *args, **kwargs): def __init__(self, *args, **kwargs):
self._queue = deque(*args, **kwargs) self._queue = deque(*args, **kwargs)
@ -79,6 +76,9 @@ class State:
sid: str sid: str
clients: dict[str, State] = {}
@sio.on("get-state") @sio.on("get-state")
async def handle_state(sid, data: dict[str, Any] = {}): async def handle_state(sid, data: dict[str, Any] = {}):
async with sio.session(sid) as session: async with sio.session(sid) as session:
@ -204,8 +204,18 @@ async def handle_register_client(sid, data: dict[str, Any]):
logger.info("Registerd new client %s", room) logger.info("Registerd new client %s", room)
initial_entries = [Entry(**entry) for entry in data["queue"]] initial_entries = [Entry(**entry) for entry in data["queue"]]
initial_recent = [Entry(**entry) for entry in data["recent"]] initial_recent = [Entry(**entry) for entry in data["recent"]]
clients[room] = State(data["secret"], {}, [], Queue(initial_entries), initial_recent, sid) clients[room] = State(
data["secret"], {}, [], Queue(initial_entries), initial_recent, sid
)
sio.enter_room(sid, room) sio.enter_room(sid, room)
await sio.emit(
"state",
{
"queue": clients[room].queue.to_dict(),
"recent": [entry.to_dict() for entry in clients[room].recent],
},
room=sid,
)
await sio.emit("client-registered", {"success": True, "room": room}, room=sid) await sio.emit("client-registered", {"success": True, "room": room}, room=sid)

View file

@ -1,15 +0,0 @@
from __future__ import annotations
import asyncio
async def play_mpv(
video: str, audio: str | None, options: list[str] = list()
) -> asyncio.subprocess.Process:
args = ["--fullscreen", *options, video] + ([f"--audio-file={audio}"] if audio else [])
mpv_process = asyncio.create_subprocess_exec("mpv", *args)
return await mpv_process
def kill_mpv(mpv: asyncio.subprocess.Process):
mpv.terminate()

View file

@ -1,43 +1,42 @@
from json import load, dump from json import load, dump
from time import sleep, perf_counter from time import sleep, perf_counter
from itertools import zip_longest from itertools import zip_longest
from threading import Event, Lock import asyncio
from asyncio import Future
import os import os
from typing import Tuple, Optional
from minio import Minio from minio import Minio
import mutagen import mutagen
from .source import Source, async_in_thread, available_sources from .source import Source, available_sources
from .common import play_mpv, kill_mpv
from ..result import Result from ..result import Result
from ..entry import Entry from ..entry import Entry
class S3Source(Source): class S3Source(Source):
def __init__(self, config): def __init__(self, config):
super().__init__() super().__init__(config)
if "endpoint" in config and "access_key" in config and "secret_key" in config: if "endpoint" in config and "access_key" in config and "secret_key" in config:
self.minio = Minio( self.minio: Minio = Minio(
config["endpoint"], config["endpoint"],
access_key=config["access_key"], access_key=config["access_key"],
secret_key=config["secret_key"], secret_key=config["secret_key"],
) )
self.bucket = config["bucket"] self.bucket: str = config["bucket"]
self.tmp_dir = config["tmp_dir"] if "tmp_dir" in config else "/tmp/syng" self.tmp_dir: str = (
config["tmp_dir"] if "tmp_dir" in config else "/tmp/syng"
)
self.index = [] if "index" not in config else config["index"] self.index: list[str] = [] if "index" not in config else config["index"]
self.downloaded_files = {} self.extra_mpv_arguments = ["--scale=oversample"]
self.player = None
self.masterlock = Lock()
async def get_entry(self, performer: str, filename: str) -> Entry: async def get_entry(self, performer: str, ident: str) -> Entry:
res = Result.from_filename(filename, "s3") res: Optional[Result] = Result.from_filename(ident, "s3")
if res is not None: if res is not None:
return Entry( return Entry(
id=filename, id=ident,
source="s3", source="s3",
duration=180, duration=180,
album=res.album, album=res.album,
@ -45,96 +44,83 @@ class S3Source(Source):
artist=res.artist, artist=res.artist,
performer=performer, performer=performer,
) )
raise RuntimeError(f"Could not parse {filename}") raise RuntimeError(f"Could not parse {ident}")
async def play(self, entry) -> None: async def get_config(self) -> dict | list[dict]:
while not entry.uuid in self.downloaded_files: def _get_config() -> dict | list[dict]:
sleep(0.1) if not self.index:
print(f"Indexing {self.bucket}")
# self.index = [
# obj.object_name
# for obj in self.minio.list_objects(self.bucket, recursive=True)
# ]
# with open("s3_files", "w") as f:
# dump(self.index, f)
with open("s3_files", "r") as f:
self.index = [item for item in load(f) if item.endswith(".cdg")]
self.downloaded_files[entry.uuid]["lock"].wait() chunked = zip_longest(*[iter(self.index)] * 1000, fillvalue="")
return [
{"index": list(filter(lambda x: x != "", chunk))} for chunk in chunked
]
cdg_file = self.downloaded_files[entry.uuid]["cdg"] return await asyncio.to_thread(_get_config)
mp3_file = self.downloaded_files[entry.uuid]["mp3"]
self.player = await play_mpv( def add_to_config(self, config: dict) -> None:
cdg_file, mp3_file, ["--scale=oversample"]
)
await self.player.wait()
async def skip_current(self, entry) -> None:
await self.player.kill()
@async_in_thread
def get_config(self):
if not self.index:
print("Start indexing")
start = perf_counter()
# self.index = [
# obj.object_name
# for obj in self.minio.list_objects(self.bucket, recursive=True)
# ]
# with open("s3_files", "w") as f:
# dump(self.index, f)
with open("s3_files", "r") as f:
self.index = [item for item in load(f) if item.endswith(".cdg")]
print(len(self.index))
stop = perf_counter()
print(f"Took {stop - start:0.4f} seconds")
chunked = zip_longest(*[iter(self.index)] * 1000, fillvalue="")
return [{"index": list(filter(lambda x: x != "", chunk))} for chunk in chunked]
def add_to_config(self, config):
self.index += config["index"] self.index += config["index"]
@async_in_thread async def search(self, result_future: asyncio.Future, query: str) -> None:
def search(self, result_future: Future, query: str) -> None:
print("searching s3") print("searching s3")
filtered = self.filter_data_by_query(query, self.index) filtered: list[str] = self.filter_data_by_query(query, self.index)
results = [] results: list[Result] = []
for filename in filtered: for filename in filtered:
print(filename) result: Optional[Result] = Result.from_filename(filename, "s3")
result = Result.from_filename(filename, "s3")
print(result)
if result is None: if result is None:
continue continue
results.append(result) results.append(result)
print(results)
result_future.set_result(results) result_future.set_result(results)
@async_in_thread async def get_missing_metadata(self, entry: Entry) -> dict:
def buffer(self, entry: Entry) -> dict: def mutagen_wrapped(file: str) -> int:
with self.masterlock: meta_infos = mutagen.File(file).info
if entry.uuid in self.downloaded_files: return int(meta_infos.length)
return {}
self.downloaded_files[entry.uuid] = {"lock": Event()}
cdg_filename = os.path.basename(entry.id) await self.ensure_playable(entry)
path_to_file = os.path.dirname(entry.id)
cdg_path_with_uuid = os.path.join(path_to_file, f"{entry.uuid}-{cdg_filename}") audio_file_name: Optional[str] = self.downloaded_files[entry.id].audio
target_file_cdg = os.path.join(self.tmp_dir, cdg_path_with_uuid)
ident_mp3 = entry.id[:-3] + "mp3" if audio_file_name is None:
target_file_mp3 = target_file_cdg[:-3] + "mp3" duration: int = 180
else:
duration = await asyncio.to_thread(mutagen_wrapped, audio_file_name)
return {"duration": int(duration)}
async def doBuffer(self, entry: Entry) -> Tuple[str, Optional[str]]:
cdg_filename: str = os.path.basename(entry.id)
path_to_file: str = os.path.dirname(entry.id)
cdg_path: str = os.path.join(path_to_file, cdg_filename)
target_file_cdg: str = os.path.join(self.tmp_dir, cdg_path)
ident_mp3: str = entry.id[:-3] + "mp3"
target_file_mp3: str = target_file_cdg[:-3] + "mp3"
os.makedirs(os.path.dirname(target_file_cdg), exist_ok=True) os.makedirs(os.path.dirname(target_file_cdg), exist_ok=True)
print( video_task: asyncio.Task = asyncio.create_task(
f'self.minio.fget_object("{self.bucket}", "{entry.id}", "{target_file_cdg}")' asyncio.to_thread(
self.minio.fget_object, self.bucket, entry.id, target_file_cdg
)
)
audio_task: asyncio.Task = asyncio.create_task(
asyncio.to_thread(
self.minio.fget_object, self.bucket, ident_mp3, target_file_mp3
)
) )
self.minio.fget_object(self.bucket, entry.id, target_file_cdg)
self.minio.fget_object(self.bucket, ident_mp3, target_file_mp3)
self.downloaded_files[entry.uuid]["cdg"] = target_file_cdg await video_task
self.downloaded_files[entry.uuid]["mp3"] = target_file_mp3 await audio_task
self.downloaded_files[entry.uuid]["lock"].set() return target_file_cdg, target_file_mp3
meta_infos = mutagen.File(target_file_mp3).info
print(f"duration is {meta_infos.length}")
return {"duration": int(meta_infos.length)}
available_sources["s3"] = S3Source available_sources["s3"] = S3Source

View file

@ -1,42 +1,86 @@
from __future__ import annotations from __future__ import annotations
import shlex import shlex
import asyncio import asyncio
from typing import Callable, Awaitable from typing import Tuple, Optional, Type, Any
import os.path import os.path
from collections import defaultdict
from dataclasses import dataclass, field
from ..entry import Entry from ..entry import Entry
from ..result import Result
def async_in_thread(func: Callable) -> Awaitable: @dataclass
async def wrapper(*args, **kwargs): class DLFilesEntry:
loop = asyncio.get_running_loop() ready: asyncio.Event = field(default_factory=asyncio.Event)
return await loop.run_in_executor(None, func, *args, **kwargs) video: str = ""
audio: Optional[str] = None
return wrapper buffering: bool = False
complete: bool = False
skip: bool = False
class Source: class Source:
def __init__(self, config: dict[str, Any]):
self.downloaded_files: defaultdict[str, DLFilesEntry] = defaultdict(
DLFilesEntry
)
self.masterlock: asyncio.Lock = asyncio.Lock()
self.player: Optional[asyncio.subprocess.Process] = None
self.extra_mpv_arguments: list[str] = []
@staticmethod
async def play_mpv(
video: str, audio: str | None, /, *options
) -> asyncio.subprocess.Process:
args = ["--fullscreen", *options, video] + (
[f"--audio-file={audio}"] if audio else []
)
mpv_process = asyncio.create_subprocess_exec("mpv", *args)
return await mpv_process
async def get_entry(self, performer: str, ident: str) -> Entry: async def get_entry(self, performer: str, ident: str) -> Entry:
raise NotImplementedError raise NotImplementedError
async def search(self, result_future: asyncio.Future, query: str) -> None: async def search(self, result_future: asyncio.Future, query: str) -> None:
raise NotImplementedError raise NotImplementedError
async def buffer(self, entry: Entry) -> dict: async def doBuffer(self, entry: Entry) -> Tuple[str, Optional[str]]:
return {}
async def play(self, entry: Entry) -> None:
raise NotImplementedError raise NotImplementedError
async def buffer(self, entry: Entry):
async with self.masterlock:
if self.downloaded_files[entry.id].buffering:
print(f"already buffering {entry.title}")
return
self.downloaded_files[entry.id].buffering = True
video, audio = await self.doBuffer(entry)
self.downloaded_files[entry.id].video = video
self.downloaded_files[entry.id].audio = audio
self.downloaded_files[entry.id].complete = True
self.downloaded_files[entry.id].ready.set()
print(f"Buffering done for {entry.title}")
async def play(self, entry: Entry) -> None:
await self.ensure_playable(entry)
self.player = await self.play_mpv(
self.downloaded_files[entry.id].video,
self.downloaded_files[entry.id].audio,
*self.extra_mpv_arguments,
)
await self.player.wait()
async def skip_current(self, entry: Entry) -> None: async def skip_current(self, entry: Entry) -> None:
pass if self.player is not None:
self.player.kill()
async def init_server(self) -> None: async def ensure_playable(self, entry: Entry):
pass await self.buffer(entry)
await self.downloaded_files[entry.id].ready.wait()
async def init_client(self) -> None: async def get_missing_metadata(self, entry: Entry) -> dict[str, Any]:
pass return {}
def filter_data_by_query(self, query: str, data: list[str]) -> list[str]: def filter_data_by_query(self, query: str, data: list[str]) -> list[str]:
def contains_all_words(words: list[str], element: str) -> bool: def contains_all_words(words: list[str], element: str) -> bool:
@ -48,11 +92,11 @@ class Source:
splitquery = shlex.split(query) splitquery = shlex.split(query)
return [element for element in data if contains_all_words(splitquery, element)] return [element for element in data if contains_all_words(splitquery, element)]
async def get_config(self) -> dict: async def get_config(self) -> dict[str, Any] | list[dict[str, Any]]:
raise NotImplementedError raise NotImplementedError
def add_to_config(self, config) -> None: def add_to_config(self, config: dict[str, Any]) -> None:
pass pass
available_sources = {} available_sources: dict[str, Type[Source]] = {}

View file

@ -1,131 +1,131 @@
import asyncio import asyncio
import shlex import shlex
from functools import partial from functools import partial
from threading import Event, Lock from typing import Optional, Tuple, Any
from pytube import YouTube, Search, Channel, innertube from pytube import YouTube, Search, Channel, innertube, Stream, StreamQuery
from .common import play_mpv, kill_mpv from .source import Source, available_sources
from .source import Source, async_in_thread, available_sources
from ..entry import Entry from ..entry import Entry
from ..result import Result from ..result import Result
class YoutubeSource(Source): class YoutubeSource(Source):
def __init__(self, config): def __init__(self, config: dict[str, Any]):
super().__init__() super().__init__(config)
self.innertube_client = innertube.InnerTube(client="WEB") self.innertube_client: innertube.InnerTube = innertube.InnerTube(client="WEB")
self.channels = config["channels"] if "channels" in config else [] self.channels: list[str] = config["channels"] if "channels" in config else []
self.tmp_dir = config["tmp_dir"] if "tmp_dir" in config else "/tmp/syng" self.tmp_dir: str = config["tmp_dir"] if "tmp_dir" in config else "/tmp/syng"
self.player: None | asyncio.subprocess.Process = None self.max_res: int = config["max_res"] if "max_res" in config else 720
self.downloaded_files = {} self.start_streaming: bool = (
self.masterlock = Lock() config["start_streaming"] if "start_streaming" in config else False
)
async def get_config(self): async def get_config(self) -> dict | list[dict]:
return {"channels": self.channels} return {"channels": self.channels}
async def play(self, entry: Entry) -> None: async def play(self, entry: Entry) -> None:
if self.start_streaming and not self.downloaded_files[entry.id].complete:
if entry.uuid in self.downloaded_files and "video" in self.downloaded_files[entry.uuid]:
print("playing locally")
video_file = self.downloaded_files[entry.uuid]["video"]
audio_file = self.downloaded_files[entry.uuid]["audio"]
self.player = await play_mpv(video_file, audio_file)
else:
print("streaming") print("streaming")
self.player = await play_mpv( self.player = await self.play_mpv(
entry.id, entry.id,
None, None,
[ "--script-opts=ytdl_hook-ytdl_path=yt-dlp,ytdl_hook-exclude='%.pls$'",
"--script-opts=ytdl_hook-ytdl_path=yt-dlp,ytdl_hook-exclude='%.pls$'", f"--ytdl-format=bestvideo[height<={self.max_res}]+bestaudio/best[height<={self.max_res}]",
"--ytdl-format=bestvideo[height<=720]+bestaudio/best[height<=720]", "--fullscreen",
"--fullscreen", )
], await self.player.wait()
else:
await super().play(entry)
async def get_entry(self, performer: str, ident: str) -> Entry:
def _get_entry(performer: str, url: str) -> Entry:
yt = YouTube(url)
return Entry(
id=url,
source="youtube",
album="YouTube",
duration=yt.length,
title=yt.title,
artist=yt.author,
performer=performer,
) )
await self.player.wait() return await asyncio.to_thread(_get_entry, performer, ident)
async def skip_current(self, entry) -> None: def _contains_index(self, query: str, result: YouTube) -> float:
await self.player.kill() compare_string: str = result.title.lower() + " " + result.author.lower()
hits: int = 0
@async_in_thread queries: list[str] = shlex.split(query.lower())
def get_entry(self, performer: str, url: str) -> Entry:
yt = YouTube(url)
return Entry(
id=url,
source="youtube",
album="YouTube",
duration=yt.length,
title=yt.title,
artist=yt.author,
performer=performer,
)
def _contains_index(self, query, result):
compare_string = result.title.lower() + " " + result.author.lower()
hits = 0
queries = shlex.split(query.lower())
for word in queries: for word in queries:
if word in compare_string: if word in compare_string:
hits += 1 hits += 1
return 1 - (hits / len(queries)) return 1 - (hits / len(queries))
@async_in_thread async def search(self, result_future: asyncio.Future, query: str) -> None:
def search(self, result_future: asyncio.Future, query: str) -> None: def _search(result_future: asyncio.Future, query: str) -> None:
results = [] results: list[YouTube] = []
for channel in self.channels: for channel in self.channels:
results += self._channel_search(query, channel) results += self._channel_search(query, channel)
results += Search(query + " karaoke").results search_results: Optional[list[YouTube]] = Search(query + " karaoke").results
if search_results is not None:
results += search_results
results.sort(key=partial(self._contains_index, query)) results.sort(key=partial(self._contains_index, query))
result_future.set_result( result_future.set_result(
[ [
Result( Result(
id=result.watch_url, id=result.watch_url,
source="youtube", source="youtube",
title=result.title, title=result.title,
artist=result.author, artist=result.author,
album="YouTube", album="YouTube",
) )
for result in results for result in results
] ]
) )
def _channel_search(self, query, channel) -> list: await asyncio.to_thread(_search, result_future, query)
browseID = Channel(f"https://www.youtube.com{channel}").channel_id
endpoint = f"{self.innertube_client.base_url}/browse"
data = {"query": query, "browseId": browseID, "params": "EgZzZWFyY2g%3D"} def _channel_search(self, query: str, channel: str) -> list[YouTube]:
browse_id: str = Channel(f"https://www.youtube.com{channel}").channel_id
endpoint: str = f"{self.innertube_client.base_url}/browse"
data: dict[str, str] = {
"query": query,
"browseId": browse_id,
"params": "EgZzZWFyY2g%3D",
}
data.update(self.innertube_client.base_data) data.update(self.innertube_client.base_data)
results = self.innertube_client._call_api( results: dict = self.innertube_client._call_api(
endpoint, self.innertube_client.base_params, data endpoint, self.innertube_client.base_params, data
) )
items = results["contents"]["twoColumnBrowseResultsRenderer"]["tabs"][-1][ items: list = results["contents"]["twoColumnBrowseResultsRenderer"]["tabs"][-1][
"expandableTabRenderer" "expandableTabRenderer"
]["content"]["sectionListRenderer"]["contents"] ]["content"]["sectionListRenderer"]["contents"]
list_of_videos = [] list_of_videos: list[YouTube] = []
for item in items: for item in items:
try: try:
if ( if (
"itemSectionRenderer" in item "itemSectionRenderer" in item
and "videoRenderer" in item["itemSectionRenderer"]["contents"][0] and "videoRenderer" in item["itemSectionRenderer"]["contents"][0]
): ):
yt_url = ( yt_url: str = (
"https://youtube.com/watch?v=" "https://youtube.com/watch?v="
+ item["itemSectionRenderer"]["contents"][0]["videoRenderer"][ + item["itemSectionRenderer"]["contents"][0]["videoRenderer"][
"videoId" "videoId"
] ]
) )
author = item["itemSectionRenderer"]["contents"][0][ author: str = item["itemSectionRenderer"]["contents"][0][
"videoRenderer" "videoRenderer"
]["ownerText"]["runs"][0]["text"] ]["ownerText"]["runs"][0]["text"]
title = item["itemSectionRenderer"]["contents"][0]["videoRenderer"][ title: str = item["itemSectionRenderer"]["contents"][0][
"title" "videoRenderer"
]["runs"][0]["text"] ]["title"]["runs"][0]["text"]
yt = YouTube(yt_url) yt: YouTube = YouTube(yt_url)
yt.author = author yt.author = author
yt.title = title yt.title = title
list_of_videos.append(yt) list_of_videos.append(yt)
@ -134,39 +134,41 @@ class YoutubeSource(Source):
pass pass
return list_of_videos return list_of_videos
@async_in_thread async def doBuffer(self, entry: Entry) -> Tuple[str, Optional[str]]:
def buffer(self, entry: Entry) -> dict: yt: YouTube = YouTube(entry.id)
print(f"Buffering {entry}")
with self.masterlock:
if entry.uuid in self.downloaded_files:
print(f"Already buffering {entry}")
return {}
self.downloaded_files[entry.uuid] = {}
yt = YouTube(entry.id) streams: StreamQuery = await asyncio.to_thread(lambda: yt.streams)
streams = yt.streams video_streams: StreamQuery = streams.filter(
type="video",
video_streams = streams.filter( custom_filter_functions=[lambda s: int(s.resolution[:-1]) <= self.max_res],
type="video", )
custom_filter_functions=[lambda s: int(s.resolution[:-1]) <= 1080] audio_streams: StreamQuery = streams.filter(only_audio=True)
)
audio_streams = streams.filter(only_audio=True)
best_720_stream = sorted(video_streams, key=lambda s: int(s.resolution[:-1]) + (1 if s.is_progressive else 0))[-1] best_video_stream: Stream = sorted(
best_audio_stream = sorted(audio_streams, key=lambda s: int(s.abr[:-4]))[-1] video_streams,
key=lambda s: int(s.resolution[:-1]) + (1 if s.is_progressive else 0),
)[-1]
best_audio_stream: Stream = sorted(
audio_streams, key=lambda s: int(s.abr[:-4])
)[-1]
print(best_720_stream) audio: Optional[str] = (
print(best_audio_stream) await asyncio.to_thread(
best_audio_stream.download,
output_path=self.tmp_dir,
filename_prefix="audio-",
)
if best_video_stream.is_adaptive
else None
)
if not best_720_stream.is_progressive: video: str = await asyncio.to_thread(
self.downloaded_files[entry.uuid]["audio"] = best_audio_stream.download(output_path=self.tmp_dir, filename_prefix=f"{entry.uuid}-audio") best_video_stream.download,
else: output_path=self.tmp_dir,
self.downloaded_files[entry.uuid]["audio"] = None )
self.downloaded_files[entry.uuid]["video"] = best_720_stream.download(output_path=self.tmp_dir, filename_prefix=entry.uuid) return video, audio
return {}
available_sources["youtube"] = YoutubeSource available_sources["youtube"] = YoutubeSource

View file

@ -1,24 +0,0 @@
import socketio
def append_yt(url):
sio = socketio.Client()
sio.connect("http://localhost:8080")
sio.emit("append", {"source": "youtube", "id": url, "performer": "test"})
sio.disconnect()
def skip():
sio = socketio.Client()
sio.connect("http://localhost:8080")
sio.emit("register-admin", {"secret": "admin"})
sio.emit("skip")
sio.disconnect()
def search(query):
sio = socketio.Client()
sio.on("search-result", print)
sio.connect("http://localhost:8080")
sio.emit("search", {"query": query})
sio.disconnect()

View file

@ -1,11 +1,14 @@
import socketio
import asyncio import asyncio
from typing import Any
from aiocmd import aiocmd
import socketio
from .result import Result from .result import Result
from .entry import Entry from .entry import Entry
from aiocmd import aiocmd
sio = socketio.AsyncClient() sio: socketio.AsyncClient = socketio.AsyncClient()
state = {} state: dict[str, Any] = {}
@sio.on("search-results") @sio.on("search-results")