From 7d9dfe2e6270a8b5ce1d272902ef232b25e07ff6 Mon Sep 17 00:00:00 2001 From: Christoph Stahl Date: Thu, 10 Nov 2022 10:43:18 +0100 Subject: [PATCH] Complete Rewrite --- syng/client.py | 98 ++++++++++++++++++++++++++++++++ syng/entry.py | 34 +++++++++++ syng/result.py | 17 ++++++ syng/server.py | 120 +++++++++++++++++++++++++++++++++++++++ syng/sources/__init__.py | 2 + syng/sources/source.py | 31 ++++++++++ syng/sources/youtube.py | 114 +++++++++++++++++++++++++++++++++++++ syng/test.py | 24 ++++++++ 8 files changed, 440 insertions(+) create mode 100644 syng/client.py create mode 100644 syng/entry.py create mode 100644 syng/result.py create mode 100644 syng/server.py create mode 100644 syng/sources/__init__.py create mode 100644 syng/sources/source.py create mode 100644 syng/sources/youtube.py create mode 100644 syng/test.py diff --git a/syng/client.py b/syng/client.py new file mode 100644 index 0000000..112499f --- /dev/null +++ b/syng/client.py @@ -0,0 +1,98 @@ +import asyncio +import socketio +from traceback import print_exc + +from .sources import YoutubeSource +from .entry import Entry + +sio = socketio.AsyncClient() + +sources = {"youtube": YoutubeSource()} + +currentLock = asyncio.Semaphore(0) +state = { + "current": None, + "all_entries": {}, +} + + +async def playerTask(): + """ + This task loops forever, and plays the first item in the queue in the appropriate player. Then it removes the first item in the queue and starts over. If no element is in the queue, it waits + """ + + while True: + await sio.emit("get-next", {}) + print("Waiting for current") + await currentLock.acquire() + try: + await sources[state["current"].source].play(state["current"].id) + except Exception: + print_exc() + print("Finished playing") + + +async def bufferTask(): + pass + + +# class BufferThread(Thread): +# """ +# This thread tries to buffer the first not-yet buffered entry in the queue in a loop. +# """ +# def run(self): +# while (True): +# for entry in self.queue: +# if entry.ready.is_set(): +# continue +# try: +# entry.source.buffer(entry.id) +# except Exception: +# print_exc() +# entry.failed = True +# entry.ready.set() +# + + +@sio.on("skip") +async def handle_skip(): + print("Skipping current") + await sources[state["current"].source].skip_current() + + +@sio.on("next") +async def handle_next(data): + state["current"] = Entry(**data) + currentLock.release() + print("released lock") + + +@sio.on("state") +async def handle_state(data): + state["all_entries"] = {entry["uuid"]: Entry(**entry) for entry in data} + + +@sio.on("connect") +async def handle_connect(): + print("Connected to server") + await sio.emit("register-client", {"secret": "test"}) + + +@sio.on("register-client") +async def handle_register(data): + if data["success"]: + print("Registered") + asyncio.create_task(playerTask()) + asyncio.create_task(bufferTask()) + else: + print("Registration failed") + await sio.disconnect() + + +async def main(): + await sio.connect("http://127.0.0.1:8080") + await sio.wait() + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/syng/entry.py b/syng/entry.py new file mode 100644 index 0000000..2ec0fa2 --- /dev/null +++ b/syng/entry.py @@ -0,0 +1,34 @@ +from __future__ import annotations +from dataclasses import dataclass, field +import uuid + + +@dataclass +class Entry: + id: str | int + source: str + duration: int + title: str + artist: str + performer: str + failed: bool = False + uuid: UUID = field(default_factory=uuid.uuid4) + + @staticmethod + async def from_source(performer: str, ident: str, source: Source) -> Entry: + return await source.get_entry(performer, ident) + + def to_dict(self) -> dict: + return { + "uuid": str(self.uuid), + "id": self.id, + "source": self.source, + "duration": self.duration, + "title": self.title, + "artist": self.artist, + "performer": self.performer, + } + + @staticmethod + def from_dict(entry_dict): + return Entry(**entry_dict) diff --git a/syng/result.py b/syng/result.py new file mode 100644 index 0000000..70bc24f --- /dev/null +++ b/syng/result.py @@ -0,0 +1,17 @@ +from dataclasses import dataclass + + +@dataclass +class Result: + id: str | int + source: str + title: str + artist: str + + def to_dict(self) -> dict: + return { + "id": self.id, + "source": self.source, + "title": self.title, + "artist": self.artist, + } diff --git a/syng/server.py b/syng/server.py new file mode 100644 index 0000000..afe1b45 --- /dev/null +++ b/syng/server.py @@ -0,0 +1,120 @@ +from __future__ import annotations +from collections import deque +from typing import Any +import asyncio + +# from flask import Flask +# from flask_socketio import SocketIO, emit # type: ignore +from aiohttp import web +import socketio + +from .entry import Entry +from .sources import YoutubeSource + +# socketio = SocketIO(app, cors_allowed_origins='*') +# sio = socketio.AsyncServer() + +sio = socketio.AsyncServer(cors_allowed_origins="*", logger=True, engineio_logger=True) +app = web.Application() +sio.attach(app) +sources = {"youtube": YoutubeSource()} + +admins = set() +admin_secrets = ["admin"] +client_secrets = ["test"] +clients = set() + + +class Queue(deque): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.num_of_entries_sem = asyncio.Semaphore(0) + self.readlock = asyncio.Lock() + + async def append(self, item: Entry) -> None: + super().append(item) + await sio.emit("state", self.to_dict()) + self.num_of_entries_sem.release() + + async def popleft(self) -> Entry: + async with self.readlock: + await self.num_of_entries_sem.acquire() + item = super().popleft() + await sio.emit("state", self.to_dict()) + return item + + def to_dict(self) -> list[dict[str, Any]]: + return [item.to_dict() for item in self] + + +queue = Queue() + + +@sio.on("get-state") +async def handle_state(sid, data: dict[str, Any]): + await sio.emit("state", queue.to_dict(), room=sid) + + +@sio.on("append") +async def handle_append(sid, data: dict[str, Any]): + print(f"append: {data}") + source_obj = sources[data["source"]] + entry = await Entry.from_source(data["performer"], data["id"], source_obj) + await queue.append(entry) + print(f"new state: {queue.to_dict()}") + + +@sio.on("get-next") +async def handle_next(sid, data: dict[str, Any]): + if sid in clients: + print(f"get-next request from client {sid}") + current = await queue.popleft() + print(f"Sending {current} to client {sid}") + print(f"new state: {queue.to_dict()}") + await sio.emit("next", current.to_dict(), room=sid) + + +@sio.on("register-client") +async def handle_register_client(sid, data: dict[str, Any]): + if data["secret"] in client_secrets: + print(f"Registerd new client {sid}") + clients.add(sid) + await sio.emit("register-client", {"success": True}, room=sid) + else: + await sio.emit("register-client", {"success": False}, room=sid) + + +@sio.on("register-admin") +async def handle_register_admin(sid, data: dict[str, str]): + if data["secret"] in admin_secrets: + print(f"Registerd new admin {sid}") + admins.add(sid) + await sio.emit("register-admin", {"success": True}, room=sid) + else: + await sio.emit("register-admin", {"success": False}, room=sid) + + +@sio.on("skip") +async def handle_skip(sid, data={}): + if sid in admins: + for client in clients: + await sio.emit("skip", room=client) + + +@sio.on("search") +async def handle_search(sid, data: dict[str, str]): + print(f"Got search request from {sid}: {data}") + query = data["query"] + results = [] + for source in sources.values(): + results += await source.search(query) + print(f"Found {len(results)} results") + await sio.emit("search-results", [result.to_dict() for result in results], room=sid) + + +def main() -> None: + web.run_app(app, port=8080) + + +if __name__ == "__main__": + main() diff --git a/syng/sources/__init__.py b/syng/sources/__init__.py new file mode 100644 index 0000000..3fec541 --- /dev/null +++ b/syng/sources/__init__.py @@ -0,0 +1,2 @@ +from .youtube import YoutubeSource +from .source import Source diff --git a/syng/sources/source.py b/syng/sources/source.py new file mode 100644 index 0000000..ec505c3 --- /dev/null +++ b/syng/sources/source.py @@ -0,0 +1,31 @@ +from __future__ import annotations +import asyncio +from typing import Callable, Awaitable + +from ..entry import Entry +from ..result import Result + + +def async_in_thread(func: Callable) -> Awaitable: + async def wrapper(*args, **kwargs): + loop = asyncio.get_running_loop() + return await loop.run_in_executor(None, func, *args, **kwargs) + + return wrapper + + +class Source: + async def get_entry(self, performer: str, ident: int | str) -> Entry: + pass + + async def search(self, query: str) -> list[Result]: + pass + + async def buffer(self, ident: int | str) -> None: + pass + + async def play(self, ident: str) -> None: + pass + + async def skip_current(self) -> None: + pass diff --git a/syng/sources/youtube.py b/syng/sources/youtube.py new file mode 100644 index 0000000..d178a37 --- /dev/null +++ b/syng/sources/youtube.py @@ -0,0 +1,114 @@ +import asyncio +import shlex +from functools import partial + +from pytube import YouTube, Search, Channel, innertube +from mpv import MPV + +from .source import Source, async_in_thread +from ..entry import Entry +from ..result import Result + + +class YoutubeSource(Source): + def __init__(self): + super().__init__() + self.innertube_client = innertube.InnerTube(client="WEB") + self.channels = ["/c/CCKaraoke"] + + @async_in_thread + def play(self, ident: str) -> None: + self.player = MPV( + input_default_bindings=True, + input_vo_keyboard=True, + osc=True, + ytdl=True, + script_opts="ytdl_hook-ytdl_path=yt-dlp", + ) + self.player.play(ident) + self.player.wait_for_playback() + + async def skip_current(self) -> None: + loop = asyncio.get_event_loop() + await loop.run_in_executor(None, self.player.terminate) + + @async_in_thread + def get_entry(self, performer: str, url: str) -> Entry: + yt = YouTube(url) + return Entry( + id=url, + source="youtube", + duration=yt.length, + title=yt.title, + artist=yt.author, + performer=performer, + ) + + def _contains_index(self, query, result): + compare_string = result.title.lower() + " " + result.author.lower() + hits = 0 + queries = shlex.split(query.lower()) + for word in queries: + if word in compare_string: + hits += 1 + + return 1 - (hits / len(queries)) + + @async_in_thread + def search(self, query: str) -> list[Result]: + for channel in self.channels: + results = self._channel_search(query, channel) + results += Search(query + " karaoke").results + + results.sort(key=partial(self._contains_index, query)) + + return [ + Result( + id=result.watch_url, + source="youtube", + title=result.title, + artist=result.author, + ) + for result in results + ] + + def _channel_search(self, query, channel): + browseID = Channel(f"https://www.youtube.com{channel}").channel_id + endpoint = f"{self.innertube_client.base_url}/browse" + + data = {"query": query, "browseId": browseID, "params": "EgZzZWFyY2g%3D"} + data.update(self.innertube_client.base_data) + results = self.innertube_client._call_api( + endpoint, self.innertube_client.base_params, data + ) + items = results["contents"]["twoColumnBrowseResultsRenderer"]["tabs"][-1][ + "expandableTabRenderer" + ]["content"]["sectionListRenderer"]["contents"] + + list_of_videos = [] + for item in items: + try: + if ( + "itemSectionRenderer" in item + and "videoRenderer" in item["itemSectionRenderer"]["contents"][0] + ): + yt_url = ( + "https://youtube.com/watch?v=" + + item["itemSectionRenderer"]["contents"][0]["videoRenderer"][ + "videoId" + ] + ) + author = item["itemSectionRenderer"]["contents"][0][ + "videoRenderer" + ]["ownerText"]["runs"][0]["text"] + title = item["itemSectionRenderer"]["contents"][0]["videoRenderer"][ + "title" + ]["runs"][0]["text"] + yt = YouTube(yt_url) + yt.author = author + yt.title = title + list_of_videos.append(yt) + + except KeyError: + pass + return list_of_videos diff --git a/syng/test.py b/syng/test.py new file mode 100644 index 0000000..58c3008 --- /dev/null +++ b/syng/test.py @@ -0,0 +1,24 @@ +import socketio + + +def append_yt(url): + sio = socketio.Client() + sio.connect("http://localhost:8080") + sio.emit("append", {"source": "youtube", "id": url, "performer": "test"}) + sio.disconnect() + + +def skip(): + sio = socketio.Client() + sio.connect("http://localhost:8080") + sio.emit("register-admin", {"secret": "admin"}) + sio.emit("skip") + sio.disconnect() + + +def search(query): + sio = socketio.Client() + sio.on("search-result", print) + sio.connect("http://localhost:8080") + sio.emit("search", {"query": query}) + sio.disconnect()