diff --git a/pyproject.toml b/pyproject.toml index 4af5f5f..966dfca 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -31,3 +31,13 @@ build-backend = "poetry.core.masonry.api" exclude = [ ".venv" ] venvPath = "." venv = ".venv" + +[[tool.mypy.overrides]] +module = [ + "pytube", + "minio", + "aiocmd", + "pyqrcode", + "socketio" +] +ignore_missing_imports = true diff --git a/syng/client.py b/syng/client.py index 5aae08e..3174b0b 100644 --- a/syng/client.py +++ b/syng/client.py @@ -5,6 +5,8 @@ from traceback import print_exc from json import load import logging from argparse import ArgumentParser +from dataclasses import dataclass, field +from typing import Optional, Any import socketio import pyqrcode @@ -13,25 +15,41 @@ from .sources import Source, configure_sources from .entry import Entry -sio = socketio.AsyncClient() -logger = logging.getLogger(__name__) +sio: socketio.AsyncClient = socketio.AsyncClient() +logger: logging.Logger = logging.getLogger(__name__) sources: dict[str, Source] = {} -currentLock = asyncio.Semaphore(0) -state = {"current": None, "queue": [], "recent": [], "room": None, "server": "", "secret": ""} +currentLock: asyncio.Semaphore = asyncio.Semaphore(0) + + +@dataclass +class State: + current_source: Optional[Source] = None + queue: list[Entry] = field(default_factory=list) + recent: list[Entry] = field(default_factory=list) + room: str = "" + server: str = "" + secret: str = "" + + +state: State = State() @sio.on("skip") async def handle_skip(): logger.info("Skipping current") - await state["current"].skip_current(state["queue"][0]) + await state.current_source.skip_current(state.queue[0]) @sio.on("state") -async def handle_state(data): - state["queue"] = [Entry(**entry) for entry in data["queue"]] - state["recent"] = [Entry(**entry) for entry in data["recent"]] +async def handle_state(data: dict[str, Any]): + state.queue = [Entry(**entry) for entry in data["queue"]] + state.recent = [Entry(**entry) for entry in data["recent"]] + + for entry in state.queue[:2]: + logger.warning(f"Buffering: %s", entry.title) + await sources[entry.source].buffer(entry) @sio.on("connect") @@ -40,35 +58,29 @@ async def handle_connect(): await sio.emit( "register-client", { - "secret": state["secret"], - "queue": [entry.to_dict() for entry in state["queue"]], - "recent": [entry.to_dict() for entry in state["recent"]], - "room": state["room"], + "secret": state.secret, + "queue": [entry.to_dict() for entry in state.queue], + "recent": [entry.to_dict() for entry in state.recent], + "room": state.room, }, ) @sio.on("buffer") -async def handle_buffer(data): - source = sources[data["source"]] - meta_info = await source.buffer(Entry(**data)) +async def handle_buffer(data: dict[str, Any]): + source: Source = sources[data["source"]] + meta_info: dict[str, Any] = await source.get_missing_metadata(Entry(**data)) await sio.emit("meta-info", {"uuid": data["uuid"], "meta": meta_info}) -async def buffer_and_report(entry): - meta_info = await sources[entry.source].buffer(entry) - await sio.emit("meta-info", {"uuid": entry.uuid, "meta": meta_info}) - - @sio.on("play") -async def handle_play(data): - entry = Entry(**data) +async def handle_play(data: dict[str, Any]): + entry: Entry = Entry(**data) print( f"Playing: {entry.artist} - {entry.title} [{entry.album}] ({entry.source}) for {entry.performer}" ) try: - state["current"] = sources[entry.source] - asyncio.create_task(buffer_and_report(entry)) + state.current_source = sources[entry.source] await sources[entry.source].play(entry) except Exception: print_exc() @@ -77,16 +89,14 @@ async def handle_play(data): @sio.on("client-registered") -async def handle_register(data): +async def handle_register(data: dict[str, Any]): if data["success"]: logging.info("Registered") - print(f"Join here: {state['server']}/{data['room']}") - print( - pyqrcode.create(f"{state['server']}/{data['room']}").terminal(quiet_zone=1) - ) - state["room"] = data["room"] + print(f"Join here: {state.server}/{data['room']}") + print(pyqrcode.create(f"{state.server}/{data['room']}").terminal(quiet_zone=1)) + state.room = data["room"] await sio.emit("sources", {"sources": list(sources.keys())}) - if state["current"] is None: + if state.current_source is None: await sio.emit("get-first") else: logging.warning("Registration failed") @@ -94,11 +104,13 @@ async def handle_register(data): @sio.on("request-config") -async def handle_request_config(data): +async def handle_request_config(data: dict[str, Any]): if data["source"] in sources: - config = await sources[data["source"]].get_config() + config: dict[str, Any] | list[dict[str, Any]] = await sources[ + data["source"] + ].get_config() if isinstance(config, list): - num_chunks = len(config) + num_chunks: int = len(config) for current, chunk in enumerate(config): await sio.emit( "config-chunk", @@ -114,7 +126,7 @@ async def handle_request_config(data): async def aiomain(): - parser = ArgumentParser() + parser: ArgumentParser = ArgumentParser() parser.add_argument("--room", "-r") parser.add_argument("--secret", "-s") @@ -127,15 +139,17 @@ async def aiomain(): source_config = load(file) sources.update(configure_sources(source_config)) if args.room: - state["room"] = args.room + state.room = args.room if args.secret: - state["secret"] = args.secret + state.secret = args.secret else: - state["secret"] = ''.join(secrets.choice(string.ascii_letters + string.digits) for _ in range(8)) - print(f"Generated secret: {state['secret']}") + state.secret = "".join( + secrets.choice(string.ascii_letters + string.digits) for _ in range(8) + ) + print(f"Generated secret: {state.secret}") - state["server"] = args.server + state.server = args.server await sio.connect(args.server) await sio.wait() diff --git a/syng/entry.py b/syng/entry.py index e06bc20..b713a18 100644 --- a/syng/entry.py +++ b/syng/entry.py @@ -1,6 +1,10 @@ from __future__ import annotations from dataclasses import dataclass, field from uuid import uuid4, UUID +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from .sources import Source @dataclass diff --git a/syng/result.py b/syng/result.py index 84f2e79..bb6d639 100644 --- a/syng/result.py +++ b/syng/result.py @@ -1,5 +1,6 @@ from __future__ import annotations from dataclasses import dataclass +from typing import Optional import os.path @@ -21,7 +22,7 @@ class Result: } @staticmethod - def from_filename(filename, source) -> Result | None: + def from_filename(filename, source) -> Optional[Result]: try: splitfile = os.path.basename(filename[:-4]).split(" - ") ident = filename diff --git a/syng/server.py b/syng/server.py index d38e967..136a18b 100644 --- a/syng/server.py +++ b/syng/server.py @@ -33,9 +33,6 @@ logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) -clients = {} - - class Queue: def __init__(self, *args, **kwargs): self._queue = deque(*args, **kwargs) @@ -79,6 +76,9 @@ class State: sid: str +clients: dict[str, State] = {} + + @sio.on("get-state") async def handle_state(sid, data: dict[str, Any] = {}): async with sio.session(sid) as session: @@ -204,8 +204,18 @@ async def handle_register_client(sid, data: dict[str, Any]): logger.info("Registerd new client %s", room) initial_entries = [Entry(**entry) for entry in data["queue"]] initial_recent = [Entry(**entry) for entry in data["recent"]] - clients[room] = State(data["secret"], {}, [], Queue(initial_entries), initial_recent, sid) + clients[room] = State( + data["secret"], {}, [], Queue(initial_entries), initial_recent, sid + ) sio.enter_room(sid, room) + await sio.emit( + "state", + { + "queue": clients[room].queue.to_dict(), + "recent": [entry.to_dict() for entry in clients[room].recent], + }, + room=sid, + ) await sio.emit("client-registered", {"success": True, "room": room}, room=sid) diff --git a/syng/sources/common.py b/syng/sources/common.py deleted file mode 100644 index 8b917dc..0000000 --- a/syng/sources/common.py +++ /dev/null @@ -1,15 +0,0 @@ -from __future__ import annotations -import asyncio - - -async def play_mpv( - video: str, audio: str | None, options: list[str] = list() -) -> asyncio.subprocess.Process: - args = ["--fullscreen", *options, video] + ([f"--audio-file={audio}"] if audio else []) - - mpv_process = asyncio.create_subprocess_exec("mpv", *args) - return await mpv_process - - -def kill_mpv(mpv: asyncio.subprocess.Process): - mpv.terminate() diff --git a/syng/sources/s3.py b/syng/sources/s3.py index d9d07a1..525c592 100644 --- a/syng/sources/s3.py +++ b/syng/sources/s3.py @@ -1,43 +1,42 @@ from json import load, dump from time import sleep, perf_counter from itertools import zip_longest -from threading import Event, Lock -from asyncio import Future +import asyncio import os +from typing import Tuple, Optional from minio import Minio import mutagen -from .source import Source, async_in_thread, available_sources -from .common import play_mpv, kill_mpv +from .source import Source, available_sources from ..result import Result from ..entry import Entry class S3Source(Source): def __init__(self, config): - super().__init__() + super().__init__(config) if "endpoint" in config and "access_key" in config and "secret_key" in config: - self.minio = Minio( + self.minio: Minio = Minio( config["endpoint"], access_key=config["access_key"], secret_key=config["secret_key"], ) - self.bucket = config["bucket"] - self.tmp_dir = config["tmp_dir"] if "tmp_dir" in config else "/tmp/syng" + self.bucket: str = config["bucket"] + self.tmp_dir: str = ( + config["tmp_dir"] if "tmp_dir" in config else "/tmp/syng" + ) - self.index = [] if "index" not in config else config["index"] - self.downloaded_files = {} - self.player = None - self.masterlock = Lock() + self.index: list[str] = [] if "index" not in config else config["index"] + self.extra_mpv_arguments = ["--scale=oversample"] - async def get_entry(self, performer: str, filename: str) -> Entry: - res = Result.from_filename(filename, "s3") + async def get_entry(self, performer: str, ident: str) -> Entry: + res: Optional[Result] = Result.from_filename(ident, "s3") if res is not None: return Entry( - id=filename, + id=ident, source="s3", duration=180, album=res.album, @@ -45,96 +44,83 @@ class S3Source(Source): artist=res.artist, performer=performer, ) - raise RuntimeError(f"Could not parse {filename}") + raise RuntimeError(f"Could not parse {ident}") - async def play(self, entry) -> None: - while not entry.uuid in self.downloaded_files: - sleep(0.1) + async def get_config(self) -> dict | list[dict]: + def _get_config() -> dict | list[dict]: + if not self.index: + print(f"Indexing {self.bucket}") + # self.index = [ + # obj.object_name + # for obj in self.minio.list_objects(self.bucket, recursive=True) + # ] + # with open("s3_files", "w") as f: + # dump(self.index, f) + with open("s3_files", "r") as f: + self.index = [item for item in load(f) if item.endswith(".cdg")] - self.downloaded_files[entry.uuid]["lock"].wait() + chunked = zip_longest(*[iter(self.index)] * 1000, fillvalue="") + return [ + {"index": list(filter(lambda x: x != "", chunk))} for chunk in chunked + ] - cdg_file = self.downloaded_files[entry.uuid]["cdg"] - mp3_file = self.downloaded_files[entry.uuid]["mp3"] + return await asyncio.to_thread(_get_config) - self.player = await play_mpv( - cdg_file, mp3_file, ["--scale=oversample"] - ) - - await self.player.wait() - - async def skip_current(self, entry) -> None: - await self.player.kill() - - @async_in_thread - def get_config(self): - if not self.index: - print("Start indexing") - start = perf_counter() - # self.index = [ - # obj.object_name - # for obj in self.minio.list_objects(self.bucket, recursive=True) - # ] - # with open("s3_files", "w") as f: - # dump(self.index, f) - with open("s3_files", "r") as f: - self.index = [item for item in load(f) if item.endswith(".cdg")] - print(len(self.index)) - stop = perf_counter() - print(f"Took {stop - start:0.4f} seconds") - - chunked = zip_longest(*[iter(self.index)] * 1000, fillvalue="") - return [{"index": list(filter(lambda x: x != "", chunk))} for chunk in chunked] - - def add_to_config(self, config): + def add_to_config(self, config: dict) -> None: self.index += config["index"] - @async_in_thread - def search(self, result_future: Future, query: str) -> None: + async def search(self, result_future: asyncio.Future, query: str) -> None: print("searching s3") - filtered = self.filter_data_by_query(query, self.index) - results = [] + filtered: list[str] = self.filter_data_by_query(query, self.index) + results: list[Result] = [] for filename in filtered: - print(filename) - result = Result.from_filename(filename, "s3") - print(result) + result: Optional[Result] = Result.from_filename(filename, "s3") if result is None: continue results.append(result) - print(results) result_future.set_result(results) - @async_in_thread - def buffer(self, entry: Entry) -> dict: - with self.masterlock: - if entry.uuid in self.downloaded_files: - return {} - self.downloaded_files[entry.uuid] = {"lock": Event()} + async def get_missing_metadata(self, entry: Entry) -> dict: + def mutagen_wrapped(file: str) -> int: + meta_infos = mutagen.File(file).info + return int(meta_infos.length) - cdg_filename = os.path.basename(entry.id) - path_to_file = os.path.dirname(entry.id) + await self.ensure_playable(entry) - cdg_path_with_uuid = os.path.join(path_to_file, f"{entry.uuid}-{cdg_filename}") - target_file_cdg = os.path.join(self.tmp_dir, cdg_path_with_uuid) + audio_file_name: Optional[str] = self.downloaded_files[entry.id].audio - ident_mp3 = entry.id[:-3] + "mp3" - target_file_mp3 = target_file_cdg[:-3] + "mp3" + if audio_file_name is None: + duration: int = 180 + else: + duration = await asyncio.to_thread(mutagen_wrapped, audio_file_name) + + return {"duration": int(duration)} + + async def doBuffer(self, entry: Entry) -> Tuple[str, Optional[str]]: + cdg_filename: str = os.path.basename(entry.id) + path_to_file: str = os.path.dirname(entry.id) + + cdg_path: str = os.path.join(path_to_file, cdg_filename) + target_file_cdg: str = os.path.join(self.tmp_dir, cdg_path) + + ident_mp3: str = entry.id[:-3] + "mp3" + target_file_mp3: str = target_file_cdg[:-3] + "mp3" os.makedirs(os.path.dirname(target_file_cdg), exist_ok=True) - print( - f'self.minio.fget_object("{self.bucket}", "{entry.id}", "{target_file_cdg}")' + video_task: asyncio.Task = asyncio.create_task( + asyncio.to_thread( + self.minio.fget_object, self.bucket, entry.id, target_file_cdg + ) + ) + audio_task: asyncio.Task = asyncio.create_task( + asyncio.to_thread( + self.minio.fget_object, self.bucket, ident_mp3, target_file_mp3 + ) ) - self.minio.fget_object(self.bucket, entry.id, target_file_cdg) - self.minio.fget_object(self.bucket, ident_mp3, target_file_mp3) - self.downloaded_files[entry.uuid]["cdg"] = target_file_cdg - self.downloaded_files[entry.uuid]["mp3"] = target_file_mp3 - self.downloaded_files[entry.uuid]["lock"].set() - - meta_infos = mutagen.File(target_file_mp3).info - - print(f"duration is {meta_infos.length}") - - return {"duration": int(meta_infos.length)} + await video_task + await audio_task + return target_file_cdg, target_file_mp3 available_sources["s3"] = S3Source diff --git a/syng/sources/source.py b/syng/sources/source.py index e7d8039..080f128 100644 --- a/syng/sources/source.py +++ b/syng/sources/source.py @@ -1,42 +1,86 @@ from __future__ import annotations import shlex import asyncio -from typing import Callable, Awaitable +from typing import Tuple, Optional, Type, Any import os.path +from collections import defaultdict +from dataclasses import dataclass, field from ..entry import Entry -from ..result import Result -def async_in_thread(func: Callable) -> Awaitable: - async def wrapper(*args, **kwargs): - loop = asyncio.get_running_loop() - return await loop.run_in_executor(None, func, *args, **kwargs) - - return wrapper +@dataclass +class DLFilesEntry: + ready: asyncio.Event = field(default_factory=asyncio.Event) + video: str = "" + audio: Optional[str] = None + buffering: bool = False + complete: bool = False + skip: bool = False class Source: + def __init__(self, config: dict[str, Any]): + self.downloaded_files: defaultdict[str, DLFilesEntry] = defaultdict( + DLFilesEntry + ) + self.masterlock: asyncio.Lock = asyncio.Lock() + self.player: Optional[asyncio.subprocess.Process] = None + self.extra_mpv_arguments: list[str] = [] + + @staticmethod + async def play_mpv( + video: str, audio: str | None, /, *options + ) -> asyncio.subprocess.Process: + args = ["--fullscreen", *options, video] + ( + [f"--audio-file={audio}"] if audio else [] + ) + + mpv_process = asyncio.create_subprocess_exec("mpv", *args) + return await mpv_process + async def get_entry(self, performer: str, ident: str) -> Entry: raise NotImplementedError async def search(self, result_future: asyncio.Future, query: str) -> None: raise NotImplementedError - async def buffer(self, entry: Entry) -> dict: - return {} - - async def play(self, entry: Entry) -> None: + async def doBuffer(self, entry: Entry) -> Tuple[str, Optional[str]]: raise NotImplementedError + async def buffer(self, entry: Entry): + async with self.masterlock: + if self.downloaded_files[entry.id].buffering: + print(f"already buffering {entry.title}") + return + self.downloaded_files[entry.id].buffering = True + + video, audio = await self.doBuffer(entry) + self.downloaded_files[entry.id].video = video + self.downloaded_files[entry.id].audio = audio + self.downloaded_files[entry.id].complete = True + self.downloaded_files[entry.id].ready.set() + print(f"Buffering done for {entry.title}") + + async def play(self, entry: Entry) -> None: + await self.ensure_playable(entry) + self.player = await self.play_mpv( + self.downloaded_files[entry.id].video, + self.downloaded_files[entry.id].audio, + *self.extra_mpv_arguments, + ) + await self.player.wait() + async def skip_current(self, entry: Entry) -> None: - pass + if self.player is not None: + self.player.kill() - async def init_server(self) -> None: - pass + async def ensure_playable(self, entry: Entry): + await self.buffer(entry) + await self.downloaded_files[entry.id].ready.wait() - async def init_client(self) -> None: - pass + async def get_missing_metadata(self, entry: Entry) -> dict[str, Any]: + return {} def filter_data_by_query(self, query: str, data: list[str]) -> list[str]: def contains_all_words(words: list[str], element: str) -> bool: @@ -48,11 +92,11 @@ class Source: splitquery = shlex.split(query) return [element for element in data if contains_all_words(splitquery, element)] - async def get_config(self) -> dict: + async def get_config(self) -> dict[str, Any] | list[dict[str, Any]]: raise NotImplementedError - def add_to_config(self, config) -> None: + def add_to_config(self, config: dict[str, Any]) -> None: pass -available_sources = {} +available_sources: dict[str, Type[Source]] = {} diff --git a/syng/sources/youtube.py b/syng/sources/youtube.py index 22f1195..bd40234 100644 --- a/syng/sources/youtube.py +++ b/syng/sources/youtube.py @@ -1,131 +1,131 @@ import asyncio import shlex from functools import partial -from threading import Event, Lock +from typing import Optional, Tuple, Any -from pytube import YouTube, Search, Channel, innertube +from pytube import YouTube, Search, Channel, innertube, Stream, StreamQuery -from .common import play_mpv, kill_mpv -from .source import Source, async_in_thread, available_sources +from .source import Source, available_sources from ..entry import Entry from ..result import Result class YoutubeSource(Source): - def __init__(self, config): - super().__init__() - self.innertube_client = innertube.InnerTube(client="WEB") - self.channels = config["channels"] if "channels" in config else [] - self.tmp_dir = config["tmp_dir"] if "tmp_dir" in config else "/tmp/syng" - self.player: None | asyncio.subprocess.Process = None - self.downloaded_files = {} - self.masterlock = Lock() + def __init__(self, config: dict[str, Any]): + super().__init__(config) + self.innertube_client: innertube.InnerTube = innertube.InnerTube(client="WEB") + self.channels: list[str] = config["channels"] if "channels" in config else [] + self.tmp_dir: str = config["tmp_dir"] if "tmp_dir" in config else "/tmp/syng" + self.max_res: int = config["max_res"] if "max_res" in config else 720 + self.start_streaming: bool = ( + config["start_streaming"] if "start_streaming" in config else False + ) - async def get_config(self): + async def get_config(self) -> dict | list[dict]: return {"channels": self.channels} async def play(self, entry: Entry) -> None: - - if entry.uuid in self.downloaded_files and "video" in self.downloaded_files[entry.uuid]: - print("playing locally") - video_file = self.downloaded_files[entry.uuid]["video"] - audio_file = self.downloaded_files[entry.uuid]["audio"] - self.player = await play_mpv(video_file, audio_file) - else: + if self.start_streaming and not self.downloaded_files[entry.id].complete: print("streaming") - self.player = await play_mpv( + self.player = await self.play_mpv( entry.id, None, - [ - "--script-opts=ytdl_hook-ytdl_path=yt-dlp,ytdl_hook-exclude='%.pls$'", - "--ytdl-format=bestvideo[height<=720]+bestaudio/best[height<=720]", - "--fullscreen", - ], + "--script-opts=ytdl_hook-ytdl_path=yt-dlp,ytdl_hook-exclude='%.pls$'", + f"--ytdl-format=bestvideo[height<={self.max_res}]+bestaudio/best[height<={self.max_res}]", + "--fullscreen", + ) + await self.player.wait() + else: + await super().play(entry) + + async def get_entry(self, performer: str, ident: str) -> Entry: + def _get_entry(performer: str, url: str) -> Entry: + yt = YouTube(url) + return Entry( + id=url, + source="youtube", + album="YouTube", + duration=yt.length, + title=yt.title, + artist=yt.author, + performer=performer, ) - await self.player.wait() + return await asyncio.to_thread(_get_entry, performer, ident) - async def skip_current(self, entry) -> None: - await self.player.kill() - - @async_in_thread - def get_entry(self, performer: str, url: str) -> Entry: - yt = YouTube(url) - return Entry( - id=url, - source="youtube", - album="YouTube", - duration=yt.length, - title=yt.title, - artist=yt.author, - performer=performer, - ) - - def _contains_index(self, query, result): - compare_string = result.title.lower() + " " + result.author.lower() - hits = 0 - queries = shlex.split(query.lower()) + def _contains_index(self, query: str, result: YouTube) -> float: + compare_string: str = result.title.lower() + " " + result.author.lower() + hits: int = 0 + queries: list[str] = shlex.split(query.lower()) for word in queries: if word in compare_string: hits += 1 return 1 - (hits / len(queries)) - @async_in_thread - def search(self, result_future: asyncio.Future, query: str) -> None: - results = [] - for channel in self.channels: - results += self._channel_search(query, channel) - results += Search(query + " karaoke").results + async def search(self, result_future: asyncio.Future, query: str) -> None: + def _search(result_future: asyncio.Future, query: str) -> None: + results: list[YouTube] = [] + for channel in self.channels: + results += self._channel_search(query, channel) + search_results: Optional[list[YouTube]] = Search(query + " karaoke").results + if search_results is not None: + results += search_results - results.sort(key=partial(self._contains_index, query)) + results.sort(key=partial(self._contains_index, query)) - result_future.set_result( - [ - Result( - id=result.watch_url, - source="youtube", - title=result.title, - artist=result.author, - album="YouTube", - ) - for result in results - ] - ) + result_future.set_result( + [ + Result( + id=result.watch_url, + source="youtube", + title=result.title, + artist=result.author, + album="YouTube", + ) + for result in results + ] + ) - def _channel_search(self, query, channel) -> list: - browseID = Channel(f"https://www.youtube.com{channel}").channel_id - endpoint = f"{self.innertube_client.base_url}/browse" + await asyncio.to_thread(_search, result_future, query) - data = {"query": query, "browseId": browseID, "params": "EgZzZWFyY2g%3D"} + def _channel_search(self, query: str, channel: str) -> list[YouTube]: + browse_id: str = Channel(f"https://www.youtube.com{channel}").channel_id + endpoint: str = f"{self.innertube_client.base_url}/browse" + + data: dict[str, str] = { + "query": query, + "browseId": browse_id, + "params": "EgZzZWFyY2g%3D", + } data.update(self.innertube_client.base_data) - results = self.innertube_client._call_api( + results: dict = self.innertube_client._call_api( endpoint, self.innertube_client.base_params, data ) - items = results["contents"]["twoColumnBrowseResultsRenderer"]["tabs"][-1][ + items: list = results["contents"]["twoColumnBrowseResultsRenderer"]["tabs"][-1][ "expandableTabRenderer" ]["content"]["sectionListRenderer"]["contents"] - list_of_videos = [] + list_of_videos: list[YouTube] = [] for item in items: try: if ( "itemSectionRenderer" in item and "videoRenderer" in item["itemSectionRenderer"]["contents"][0] ): - yt_url = ( + yt_url: str = ( "https://youtube.com/watch?v=" + item["itemSectionRenderer"]["contents"][0]["videoRenderer"][ "videoId" ] ) - author = item["itemSectionRenderer"]["contents"][0][ + author: str = item["itemSectionRenderer"]["contents"][0][ "videoRenderer" ]["ownerText"]["runs"][0]["text"] - title = item["itemSectionRenderer"]["contents"][0]["videoRenderer"][ - "title" - ]["runs"][0]["text"] - yt = YouTube(yt_url) + title: str = item["itemSectionRenderer"]["contents"][0][ + "videoRenderer" + ]["title"]["runs"][0]["text"] + yt: YouTube = YouTube(yt_url) yt.author = author yt.title = title list_of_videos.append(yt) @@ -134,39 +134,41 @@ class YoutubeSource(Source): pass return list_of_videos - @async_in_thread - def buffer(self, entry: Entry) -> dict: - print(f"Buffering {entry}") - with self.masterlock: - if entry.uuid in self.downloaded_files: - print(f"Already buffering {entry}") - return {} - self.downloaded_files[entry.uuid] = {} + async def doBuffer(self, entry: Entry) -> Tuple[str, Optional[str]]: + yt: YouTube = YouTube(entry.id) - yt = YouTube(entry.id) + streams: StreamQuery = await asyncio.to_thread(lambda: yt.streams) - streams = yt.streams - - video_streams = streams.filter( - type="video", - custom_filter_functions=[lambda s: int(s.resolution[:-1]) <= 1080] - ) - audio_streams = streams.filter(only_audio=True) + video_streams: StreamQuery = streams.filter( + type="video", + custom_filter_functions=[lambda s: int(s.resolution[:-1]) <= self.max_res], + ) + audio_streams: StreamQuery = streams.filter(only_audio=True) - best_720_stream = sorted(video_streams, key=lambda s: int(s.resolution[:-1]) + (1 if s.is_progressive else 0))[-1] - best_audio_stream = sorted(audio_streams, key=lambda s: int(s.abr[:-4]))[-1] + best_video_stream: Stream = sorted( + video_streams, + key=lambda s: int(s.resolution[:-1]) + (1 if s.is_progressive else 0), + )[-1] + best_audio_stream: Stream = sorted( + audio_streams, key=lambda s: int(s.abr[:-4]) + )[-1] - print(best_720_stream) - print(best_audio_stream) + audio: Optional[str] = ( + await asyncio.to_thread( + best_audio_stream.download, + output_path=self.tmp_dir, + filename_prefix="audio-", + ) + if best_video_stream.is_adaptive + else None + ) - if not best_720_stream.is_progressive: - self.downloaded_files[entry.uuid]["audio"] = best_audio_stream.download(output_path=self.tmp_dir, filename_prefix=f"{entry.uuid}-audio") - else: - self.downloaded_files[entry.uuid]["audio"] = None + video: str = await asyncio.to_thread( + best_video_stream.download, + output_path=self.tmp_dir, + ) - self.downloaded_files[entry.uuid]["video"] = best_720_stream.download(output_path=self.tmp_dir, filename_prefix=entry.uuid) - - return {} + return video, audio available_sources["youtube"] = YoutubeSource diff --git a/syng/test.py b/syng/test.py deleted file mode 100644 index 58c3008..0000000 --- a/syng/test.py +++ /dev/null @@ -1,24 +0,0 @@ -import socketio - - -def append_yt(url): - sio = socketio.Client() - sio.connect("http://localhost:8080") - sio.emit("append", {"source": "youtube", "id": url, "performer": "test"}) - sio.disconnect() - - -def skip(): - sio = socketio.Client() - sio.connect("http://localhost:8080") - sio.emit("register-admin", {"secret": "admin"}) - sio.emit("skip") - sio.disconnect() - - -def search(query): - sio = socketio.Client() - sio.on("search-result", print) - sio.connect("http://localhost:8080") - sio.emit("search", {"query": query}) - sio.disconnect() diff --git a/syng/webclientmockup.py b/syng/webclientmockup.py index 0820865..ba23016 100644 --- a/syng/webclientmockup.py +++ b/syng/webclientmockup.py @@ -1,11 +1,14 @@ -import socketio import asyncio +from typing import Any + +from aiocmd import aiocmd +import socketio + from .result import Result from .entry import Entry -from aiocmd import aiocmd -sio = socketio.AsyncClient() -state = {} +sio: socketio.AsyncClient = socketio.AsyncClient() +state: dict[str, Any] = {} @sio.on("search-results")