diff --git a/pyproject.toml b/pyproject.toml index 2420894..50054fe 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,8 +13,16 @@ aiohttp = "^3.8.3" python-mpv = "^1.0.1" python-socketio = "^5.7.2" minio = "^7.1.12" +colored = "^1.4.4" +mutagen = "^1.46.0" +aiocmd = "^0.1.5" [build-system] requires = ["poetry-core"] build-backend = "poetry.core.masonry.api" + +[tool.pyright] +exclude = [ ".venv" ] +venvPath = "." +venv = ".venv" diff --git a/syng/client.py b/syng/client.py index b723d79..c5fcf06 100644 --- a/syng/client.py +++ b/syng/client.py @@ -1,17 +1,18 @@ import asyncio -import socketio from traceback import print_exc from json import load +import socketio + from .sources import Source, configure_sources from .entry import Entry sio = socketio.AsyncClient() -with open("./syng-client.json") as f: +with open("./syng-client.json", encoding="utf8") as f: source_config = load(f) -sources = configure_sources(source_config, client=True) +sources: dict[str, Source] = configure_sources(source_config) currentLock = asyncio.Semaphore(0) state = { @@ -20,55 +21,10 @@ state = { } -async def playerTask(): - """ - This task loops forever, and plays the first item in the queue in the appropriate player. Then it removes the first item in the queue and starts over. If no element is in the queue, it waits - """ - - while True: - await sio.emit("get-next", {}) - print("Waiting for current") - await currentLock.acquire() - try: - await sources[state["current"].source].play(state["current"].id) - except Exception: - print_exc() - print("Finished playing") - - -async def bufferTask(): - pass - - -# class BufferThread(Thread): -# """ -# This thread tries to buffer the first not-yet buffered entry in the queue in a loop. -# """ -# def run(self): -# while (True): -# for entry in self.queue: -# if entry.ready.is_set(): -# continue -# try: -# entry.source.buffer(entry.id) -# except Exception: -# print_exc() -# entry.failed = True -# entry.ready.set() -# - - @sio.on("skip") async def handle_skip(): print("Skipping current") - await sources[state["current"].source].skip_current() - - -@sio.on("next") -async def handle_next(data): - state["current"] = Entry(**data) - currentLock.release() - print("released lock") + await state["current"].skip_current() @sio.on("state") @@ -82,18 +38,58 @@ async def handle_connect(): await sio.emit("register-client", {"secret": "test"}) +@sio.on("buffer") +async def handle_buffer(data): + source = sources[data["source"]] + meta_info = await source.buffer(Entry(**data)) + await sio.emit("meta-info", {"uuid": data["uuid"], "meta": meta_info}) + + +@sio.on("play") +async def handle_play(data): + entry = Entry(**data) + print(f"Playing {entry}") + try: + meta_info = await sources[entry.source].buffer(entry) + await sio.emit("meta-info", {"uuid": data["uuid"], "meta": meta_info}) + state["current"] = sources[entry.source] + await sources[entry.source].play(entry) + except Exception: + print_exc() + await sio.emit("pop-then-get-next") + + @sio.on("client-registered") async def handle_register(data): if data["success"]: print("Registered") - await sio.emit("config", {"sources": source_config}) - asyncio.create_task(playerTask()) - asyncio.create_task(bufferTask()) + await sio.emit("sources", {"sources": list(source_config.keys())}) + await sio.emit("get-first") else: print("Registration failed") await sio.disconnect() +@sio.on("request-config") +async def handle_request_config(data): + if data["source"] in sources: + config = await sources[data["source"]].get_config() + if isinstance(config, list): + num_chunks = len(config) + for current, chunk in enumerate(config): + await sio.emit( + "config-chunk", + { + "source": data["source"], + "config": chunk, + "number": current + 1, + "total": num_chunks, + }, + ) + else: + await sio.emit("config", {"source": data["source"], "config": config}) + + async def main(): await sio.connect("http://127.0.0.1:8080") await sio.wait() diff --git a/syng/entry.py b/syng/entry.py index 2ec0fa2..0b2c061 100644 --- a/syng/entry.py +++ b/syng/entry.py @@ -1,18 +1,18 @@ from __future__ import annotations from dataclasses import dataclass, field -import uuid +from uuid import uuid4, UUID @dataclass class Entry: - id: str | int + id: str source: str duration: int title: str artist: str performer: str failed: bool = False - uuid: UUID = field(default_factory=uuid.uuid4) + uuid: UUID = field(default_factory=uuid4) @staticmethod async def from_source(performer: str, ident: str, source: Source) -> Entry: @@ -32,3 +32,6 @@ class Entry: @staticmethod def from_dict(entry_dict): return Entry(**entry_dict) + + def update(self, **kwargs): + self.__dict__.update(kwargs) diff --git a/syng/result.py b/syng/result.py index 70bc24f..84f2e79 100644 --- a/syng/result.py +++ b/syng/result.py @@ -1,4 +1,6 @@ +from __future__ import annotations from dataclasses import dataclass +import os.path @dataclass @@ -7,6 +9,7 @@ class Result: source: str title: str artist: str + album: str def to_dict(self) -> dict: return { @@ -14,4 +17,17 @@ class Result: "source": self.source, "title": self.title, "artist": self.artist, + "album": self.album, } + + @staticmethod + def from_filename(filename, source) -> Result | None: + try: + splitfile = os.path.basename(filename[:-4]).split(" - ") + ident = filename + artist = splitfile[0].strip() + title = splitfile[1].strip() + album = splitfile[2].strip() + return Result(ident, source, title, artist, album) + except IndexError: + return None diff --git a/syng/server.py b/syng/server.py index e8053e5..186ca7a 100644 --- a/syng/server.py +++ b/syng/server.py @@ -2,25 +2,27 @@ from __future__ import annotations from collections import deque from typing import Any import asyncio +from dataclasses import dataclass -# from flask import Flask -# from flask_socketio import SocketIO, emit # type: ignore from aiohttp import web import socketio from .entry import Entry -from .sources import configure_sources - -# socketio = SocketIO(app, cors_allowed_origins='*') -# sio = socketio.AsyncServer() +from .sources import Source, available_sources sio = socketio.AsyncServer(cors_allowed_origins="*", logger=True, engineio_logger=True) app = web.Application() sio.attach(app) -admin_secrets = ["admin"] -client_secrets = ["test"] -sources = {} + +@dataclass +class State: + admin_secret: str | None + sources: dict[str, Source] + sources_prio: list[str] + + +global_state = State(None, {}, []) class Queue(deque): @@ -34,6 +36,14 @@ class Queue(deque): await sio.emit("state", self.to_dict()) self.num_of_entries_sem.release() + async def peek(self) -> Entry: + async with self.readlock: + await self.num_of_entries_sem.acquire() + item = super().popleft() + super().appendleft(item) + self.num_of_entries_sem.release() + return item + async def popleft(self) -> Entry: async with self.readlock: await self.num_of_entries_sem.acquire() @@ -49,53 +59,110 @@ queue = Queue() @sio.on("get-state") -async def handle_state(sid, data: dict[str, Any]): +async def handle_state(sid, data: dict[str, Any] = {}): await sio.emit("state", queue.to_dict(), room=sid) @sio.on("append") async def handle_append(sid, data: dict[str, Any]): - print(f"append: {data}") - source_obj = sources[data["source"]] + source_obj = global_state.sources[data["source"]] entry = await Entry.from_source(data["performer"], data["id"], source_obj) await queue.append(entry) print(f"new state: {queue.to_dict()}") + await sio.emit( + "buffer", + entry.to_dict(), + room="clients", + ) -@sio.on("get-next") -async def handle_next(sid, data: dict[str, Any]): + +@sio.on("meta-info") +async def handle_meta_info(sid, data): async with sio.session(sid) as session: if "client" in session and session["client"]: - print(f"get-next request from client {sid}") - current = await queue.popleft() + for item in queue: + if str(item.uuid) == data["uuid"]: + item.update(**data["meta"]) + + await sio.emit("state", queue.to_dict()) + + +@sio.on("get-first") +async def handle_get_first(sid, data={}): + async with sio.session(sid) as session: + if "client" in session and session["client"]: + current = await queue.peek() print(f"Sending {current} to client {sid}") - print(f"new state: {queue.to_dict()}") - await sio.emit("next", current.to_dict(), room=sid) + await sio.emit("play", current.to_dict(), room=sid) + + +@sio.on("pop-then-get-next") +async def handle_pop_then_get_next(sid, data={}): + async with sio.session(sid) as session: + if "client" in session and session["client"]: + await queue.popleft() + current = await queue.peek() + print(f"Sending {current} to client {sid}") + await sio.emit("play", current.to_dict(), room=sid) @sio.on("register-client") async def handle_register_client(sid, data: dict[str, Any]): - if data["secret"] in client_secrets: - print(f"Registerd new client {sid}") - await sio.save_session(sid, {"client": True}) - sio.enter_room(sid, "clients") - await sio.emit("client-registered", {"success": True}, room=sid) - else: - await sio.emit("client-registered", {"success": False}, room=sid) + print(f"Registerd new client {sid}") + global_state.admin_secret = data["secret"] + await sio.save_session(sid, {"client": True}) + sio.enter_room(sid, "clients") + await sio.emit("client-registered", {"success": True}, room=sid) + + +@sio.on("sources") +async def handle_sources(sid, data): + """ + Get the list of sources the client wants to use. + Update internal list of sources, remove unused + sources and query for a config for all uninitialized sources + """ + async with sio.session(sid) as session: + if "client" in session and session["client"]: + unused_sources = global_state.sources.keys() - data["sources"] + new_sources = data["sources"] - global_state.sources.keys() + + for source in unused_sources: + del global_state.sources[source] + + global_state.sources_prio = data["sources"] + + for name in new_sources: + await sio.emit("request-config", {"source": name}, room=sid) + + +@sio.on("config-chunk") +async def handle_config_chung(sid, data): + async with sio.session(sid) as session: + if "client" in session and session["client"]: + if not data["source"] in global_state.sources: + global_state.sources[data["source"]] = available_sources[ + data["source"] + ](data["config"]) + else: + global_state.sources[data["source"]].add_to_config(data["config"]) @sio.on("config") async def handle_config(sid, data): async with sio.session(sid) as session: if "client" in session and session["client"]: - sources.update(configure_sources(data["sources"], client=False)) - print(f"Updated Config: {sources}") + global_state.sources[data["source"]] = available_sources[data["source"]]( + data["config"] + ) + print(f"Added source {data['source']}") @sio.on("register-admin") async def handle_register_admin(sid, data: dict[str, str]): - if data["secret"] in admin_secrets: + if global_state.admin_secret and data["secret"] in global_state.admin_secret: print(f"Registerd new admin {sid}") await sio.save_session(sid, {"admin": True}) await sio.emit("register-admin", {"success": True}, room=sid) @@ -104,17 +171,23 @@ async def handle_register_admin(sid, data: dict[str, str]): @sio.on("get-config") -async def handle_config(sid, data): +async def handle_get_config(sid, data): async with sio.session(sid) as session: if "admin" in session and session["admin"]: - await sio.emit("config", list(sources.keys())) + await sio.emit( + "config", + { + name: source.get_config() + for name, source in global_state.sources.items() + }, + ) @sio.on("skip") async def handle_skip(sid, data={}): async with sio.session(sid) as session: if "admin" in session and session["admin"]: - await sio.emit("skip", room="client") + await sio.emit("skip", room="clients") @sio.on("disconnect") @@ -128,9 +201,18 @@ async def handle_disconnect(sid, data={}): async def handle_search(sid, data: dict[str, str]): print(f"Got search request from {sid}: {data}") query = data["query"] - results = [] - for source in sources.values(): - results += await source.search(query) + result_futures = [] + for source in global_state.sources_prio: + loop = asyncio.get_running_loop() + search_future = loop.create_future() + loop.create_task(global_state.sources[source].search(search_future, query)) + result_futures.append(search_future) + + results = [ + search_result + for result_future in result_futures + for search_result in await result_future + ] print(f"Found {len(results)} results") await sio.emit("search-results", [result.to_dict() for result in results], room=sid) diff --git a/syng/sources/__init__.py b/syng/sources/__init__.py index 4ffaa6e..9ca7326 100644 --- a/syng/sources/__init__.py +++ b/syng/sources/__init__.py @@ -3,14 +3,9 @@ from .youtube import YoutubeSource from .s3 import S3Source -def configure_sources(configs: dict, client) -> dict[str, Source]: - print(available_sources) +def configure_sources(configs: dict) -> dict[str, Source]: configured_sources = {} for source, config in configs.items(): if source in available_sources: configured_sources[source] = available_sources[source](config) - if client: - configured_sources[source].init_client() - else: - configured_sources[source].init_server() return configured_sources diff --git a/syng/sources/s3.py b/syng/sources/s3.py index 0a55709..8b9ecc6 100644 --- a/syng/sources/s3.py +++ b/syng/sources/s3.py @@ -1,33 +1,144 @@ +from json import load +from time import sleep, perf_counter +from itertools import zip_longest +from threading import Event, Lock +from asyncio import Future +import os + from minio import Minio -from time import perf_counter +from mpv import MPV +import mutagen from .source import Source, async_in_thread, available_sources from ..result import Result +from ..entry import Entry class S3Source(Source): def __init__(self, config): super().__init__() - self.minio = Minio( - config["s3_endpoint"], - access_key=config["access_key"], - secret_key=config["secret_key"], - ) - self.bucket = config["bucket"] - def init_server(self): - print("Start indexing") - start = perf_counter() - self.index = list(self.minio.list_objects("bucket")) - stop = perf_counter() - print(f"Took {stop - start:0.4f} seconds") + if "endpoint" in config and "access_key" in config and "secret_key" in config: + self.minio = Minio( + config["endpoint"], + access_key=config["access_key"], + secret_key=config["secret_key"], + ) + self.bucket = config["bucket"] + self.tmp_dir = config["tmp_dir"] if "tmp_dir" in config else "/tmp/syng" + + self.index = [] if "index" not in config else config["index"] + self.downloaded_files = {} + self.player = None + self.masterlock = Lock() + + async def get_entry(self, performer: str, filename: str) -> Entry: + res = Result.from_filename(filename, "s3") + if res is not None: + return Entry( + id=filename, + source="s3", + duration=180, + title=res.title, + artist=res.artist, + performer=performer, + ) + raise RuntimeError(f"Could not parse {filename}") @async_in_thread - def search(self, query: str) -> list[Result]: - pass + def play(self, entry) -> None: + while not entry.uuid in self.downloaded_files: + sleep(0.1) - async def build_index(): - pass + self.downloaded_files[entry.uuid]["lock"].wait() + + self.player = MPV( + input_default_bindings=True, + input_vo_keyboard=True, + osc=True, + fullscreen=True, + ) + + cdg_file = self.downloaded_files[entry.uuid]["cdg"] + mp3_file = self.downloaded_files[entry.uuid]["mp3"] + + self.player.loadfile( + cdg_file, + mode="replace", + audio_file=mp3_file, + scale="oversample", + ) + self.player.wait_for_playback() + self.player.terminate() + + @async_in_thread + def get_config(self): + if not self.index: + print("Start indexing") + start = perf_counter() + # self.index = [ + # obj.object_name + # for obj in self.minio.list_objects(self.bucket, recursive=True) + # ] + with open("s3_files", "r") as f: + self.index = [item for item in load(f) if item.endswith(".cdg")] + print(len(self.index)) + stop = perf_counter() + print(f"Took {stop - start:0.4f} seconds") + + chunked = zip_longest(*[iter(self.index)] * 1000, fillvalue="") + return [{"index": list(filter(lambda x: x != "", chunk))} for chunk in chunked] + + def add_to_config(self, config): + self.index += config["index"] + + @async_in_thread + def search(self, result_future: Future, query: str) -> None: + print("searching s3") + filtered = self.filter_data_by_query(query, self.index) + results = [] + for filename in filtered: + print(filename) + result = Result.from_filename(filename, "s3") + print(result) + if result is None: + continue + results.append(result) + print(results) + result_future.set_result(results) + + @async_in_thread + def buffer(self, entry: Entry) -> dict: + with self.masterlock: + if entry.uuid in self.downloaded_files: + return {} + self.downloaded_files[entry.uuid] = {"lock": Event()} + + cdg_filename = os.path.basename(entry.id) + path_to_file = os.path.dirname(entry.id) + + cdg_path_with_uuid = os.path.join(path_to_file, f"{entry.uuid}-{cdg_filename}") + target_file_cdg = os.path.join(self.tmp_dir, cdg_path_with_uuid) + + ident_mp3 = entry.id[:-3] + "mp3" + target_file_mp3 = target_file_cdg[:-3] + "mp3" + os.makedirs(os.path.dirname(target_file_cdg), exist_ok=True) + + print( + f'self.minio.fget_object("{self.bucket}", "{entry.id}", "{target_file_cdg}")' + ) + self.minio.fget_object(self.bucket, entry.id, target_file_cdg) + self.minio.fget_object(self.bucket, ident_mp3, target_file_mp3) + + self.downloaded_files[entry.uuid]["cdg"] = target_file_cdg + self.downloaded_files[entry.uuid]["mp3"] = target_file_mp3 + self.downloaded_files[entry.uuid]["lock"].set() + + meta_infos = mutagen.File(target_file_mp3).info + + print(f"duration is {meta_infos.length}") + + return {"duration": int(meta_infos.length)} available_sources["s3"] = S3Source diff --git a/syng/sources/source.py b/syng/sources/source.py index 95f0c37..6840e97 100644 --- a/syng/sources/source.py +++ b/syng/sources/source.py @@ -1,6 +1,8 @@ from __future__ import annotations +import shlex import asyncio from typing import Callable, Awaitable +import os.path from ..entry import Entry from ..result import Result @@ -15,25 +17,41 @@ def async_in_thread(func: Callable) -> Awaitable: class Source: - async def get_entry(self, performer: str, ident: int | str) -> Entry: - pass + async def get_entry(self, performer: str, ident: str) -> Entry: + raise NotImplementedError - async def search(self, query: str) -> list[Result]: - pass + async def search(self, result_future: asyncio.Future, query: str) -> None: + raise NotImplementedError - async def buffer(self, ident: int | str) -> None: - pass + async def buffer(self, entry: Entry) -> dict: + return {} - async def play(self, ident: str) -> None: - pass + async def play(self, entry: Entry) -> None: + raise NotImplementedError async def skip_current(self) -> None: pass - def init_server(self) -> None: + async def init_server(self) -> None: pass - def init_client(self) -> None: + async def init_client(self) -> None: + pass + + def filter_data_by_query(self, query: str, data: list[str]) -> list[str]: + def contains_all_words(words: list[str], element: str) -> bool: + for word in words: + if not word.lower() in os.path.basename(element).lower(): + return False + return True + + splitquery = shlex.split(query) + return [element for element in data if contains_all_words(splitquery, element)] + + async def get_config(self) -> dict: + raise NotImplementedError + + def add_to_config(self, config) -> None: pass diff --git a/syng/sources/youtube.py b/syng/sources/youtube.py index a6b176f..225f24d 100644 --- a/syng/sources/youtube.py +++ b/syng/sources/youtube.py @@ -15,19 +15,24 @@ class YoutubeSource(Source): super().__init__() self.innertube_client = innertube.InnerTube(client="WEB") self.channels = config["channels"] if "channels" in config else [] + self.player = None + + async def get_config(self): + return {"channels": self.channels} @async_in_thread - def play(self, ident: str) -> None: - player = MPV( + def play(self, entry: Entry) -> None: + self.player = MPV( input_default_bindings=True, input_vo_keyboard=True, osc=True, ytdl=True, script_opts="ytdl_hook-ytdl_path=yt-dlp", + fullscreen=True, ) - player.play(ident) - player.wait_for_playback() - del player + self.player.play(entry.id) + self.player.wait_for_playback() + self.player.terminate() async def skip_current(self) -> None: loop = asyncio.get_event_loop() @@ -56,24 +61,28 @@ class YoutubeSource(Source): return 1 - (hits / len(queries)) @async_in_thread - def search(self, query: str) -> list[Result]: + def search(self, result_future: asyncio.Future, query: str) -> None: + results = [] for channel in self.channels: - results = self._channel_search(query, channel) + results += self._channel_search(query, channel) results += Search(query + " karaoke").results results.sort(key=partial(self._contains_index, query)) - return [ - Result( - id=result.watch_url, - source="youtube", - title=result.title, - artist=result.author, - ) - for result in results - ] + result_future.set_result( + [ + Result( + id=result.watch_url, + source="youtube", + title=result.title, + artist=result.author, + album="YouTube", + ) + for result in results + ] + ) - def _channel_search(self, query, channel): + def _channel_search(self, query, channel) -> list: browseID = Channel(f"https://www.youtube.com{channel}").channel_id endpoint = f"{self.innertube_client.base_url}/browse" diff --git a/syng/webclientmockup.py b/syng/webclientmockup.py new file mode 100644 index 0000000..2917106 --- /dev/null +++ b/syng/webclientmockup.py @@ -0,0 +1,112 @@ +import socketio +import asyncio +from .result import Result +from .entry import Entry +from aiocmd import aiocmd + +sio = socketio.AsyncClient() + + +@sio.on("search-results") +async def handle_search_results(data): + for raw_item in data: + item = Result(**raw_item) + print(f"{item.artist} - {item.title} [{item.album}]") + print(f"{item.source}: {item.id}") + + +@sio.on("state") +async def handle_state(data): + print("New Queue") + for raw_item in data: + item = Entry(**raw_item) + print(f"\t{item.performer}: {item.artist} - {item.title} ({item.duration})") + + +@sio.on("connect") +async def handle_connect(): + print("Connected") + # await sio.emit("search", {"query": "Linkin Park"}) + # await sio.emit( + # "append", + # { + # "performer": "Hammy", + # "source": "youtube", + # "id": "https://www.youtube.com/watch?v=rqZqHXJm-UA", # https://youtube.com/watch?v=x5bM5Bdizi4", + # }, + # ) + # await sio.emit( + # "append", + # { + # "performer": "Hammy", + # "source": "s3", + # "id": "Sunfly Gold/SFGD034 - Linkin Park & Limp Bizkit/Linkin Park - Pushing Me Away - Sunfly Gold 34.cdg", + # }, + # ) + # await sio.emit( + # "append", + # { + # "performer": "Hammy", + # "source": "s3", + # "id": "Sunfly Gold/SFGD034 - Linkin Park & Limp Bizkit/Linkin Park - Pushing Me Away - Sunfly Gold 34.cdg", + # }, + # ) + # await sio.emit( + # "append", + # { + # "performer": "Hammy", + # "source": "s3", + # "id": "Sunfly Gold/SFGD034 - Linkin Park & Limp Bizkit/Linkin Park - Pushing Me Away - Sunfly Gold 34.cdg", + # }, + # ) + + +@sio.on("register-admin") +async def handle_register_admin(data): + if data["success"]: + print("Logged in") + else: + print("Log in failed") + + +class SyngShell(aiocmd.PromptToolkitCmd): + prompt = "syng> " + + def do_exit(self): + return True + + async def do_stuff(self): + await sio.emit( + "append", + { + "performer": "Hammy", + "source": "youtube", + "id": "https://www.youtube.com/watch?v=rqZqHXJm-UA", # https://youtube.com/watch?v=x5bM5Bdizi4", + }, + ) + + async def do_search(self, query): + await sio.emit("search", {"query": query}) + + async def do_append(self, source, ident): + await sio.emit("append", {"performer": "Hammy", "source": source, "id": ident}) + + async def do_admin(self, data): + await sio.emit("register-admin", {"secret": data}) + + async def do_connect(self): + await sio.connect("http://127.0.0.1:8080") + + async def do_skip(self): + await sio.emit("skip") + + async def do_queue(self): + await sio.emit("get-state") + + +async def main(): + await sio.connect("http://127.0.0.1:8080") + + +if __name__ == "__main__": + asyncio.run(SyngShell().run())