Complete Rewrite

This commit is contained in:
Christoph Stahl 2022-11-10 10:43:18 +01:00
commit 7d9dfe2e62
8 changed files with 440 additions and 0 deletions

98
syng/client.py Normal file
View file

@ -0,0 +1,98 @@
import asyncio
import socketio
from traceback import print_exc
from .sources import YoutubeSource
from .entry import Entry
sio = socketio.AsyncClient()
sources = {"youtube": YoutubeSource()}
currentLock = asyncio.Semaphore(0)
state = {
"current": None,
"all_entries": {},
}
async def playerTask():
"""
This task loops forever, and plays the first item in the queue in the appropriate player. Then it removes the first item in the queue and starts over. If no element is in the queue, it waits
"""
while True:
await sio.emit("get-next", {})
print("Waiting for current")
await currentLock.acquire()
try:
await sources[state["current"].source].play(state["current"].id)
except Exception:
print_exc()
print("Finished playing")
async def bufferTask():
pass
# class BufferThread(Thread):
# """
# This thread tries to buffer the first not-yet buffered entry in the queue in a loop.
# """
# def run(self):
# while (True):
# for entry in self.queue:
# if entry.ready.is_set():
# continue
# try:
# entry.source.buffer(entry.id)
# except Exception:
# print_exc()
# entry.failed = True
# entry.ready.set()
#
@sio.on("skip")
async def handle_skip():
print("Skipping current")
await sources[state["current"].source].skip_current()
@sio.on("next")
async def handle_next(data):
state["current"] = Entry(**data)
currentLock.release()
print("released lock")
@sio.on("state")
async def handle_state(data):
state["all_entries"] = {entry["uuid"]: Entry(**entry) for entry in data}
@sio.on("connect")
async def handle_connect():
print("Connected to server")
await sio.emit("register-client", {"secret": "test"})
@sio.on("register-client")
async def handle_register(data):
if data["success"]:
print("Registered")
asyncio.create_task(playerTask())
asyncio.create_task(bufferTask())
else:
print("Registration failed")
await sio.disconnect()
async def main():
await sio.connect("http://127.0.0.1:8080")
await sio.wait()
if __name__ == "__main__":
asyncio.run(main())

34
syng/entry.py Normal file
View file

@ -0,0 +1,34 @@
from __future__ import annotations
from dataclasses import dataclass, field
import uuid
@dataclass
class Entry:
id: str | int
source: str
duration: int
title: str
artist: str
performer: str
failed: bool = False
uuid: UUID = field(default_factory=uuid.uuid4)
@staticmethod
async def from_source(performer: str, ident: str, source: Source) -> Entry:
return await source.get_entry(performer, ident)
def to_dict(self) -> dict:
return {
"uuid": str(self.uuid),
"id": self.id,
"source": self.source,
"duration": self.duration,
"title": self.title,
"artist": self.artist,
"performer": self.performer,
}
@staticmethod
def from_dict(entry_dict):
return Entry(**entry_dict)

17
syng/result.py Normal file
View file

@ -0,0 +1,17 @@
from dataclasses import dataclass
@dataclass
class Result:
id: str | int
source: str
title: str
artist: str
def to_dict(self) -> dict:
return {
"id": self.id,
"source": self.source,
"title": self.title,
"artist": self.artist,
}

120
syng/server.py Normal file
View file

@ -0,0 +1,120 @@
from __future__ import annotations
from collections import deque
from typing import Any
import asyncio
# from flask import Flask
# from flask_socketio import SocketIO, emit # type: ignore
from aiohttp import web
import socketio
from .entry import Entry
from .sources import YoutubeSource
# socketio = SocketIO(app, cors_allowed_origins='*')
# sio = socketio.AsyncServer()
sio = socketio.AsyncServer(cors_allowed_origins="*", logger=True, engineio_logger=True)
app = web.Application()
sio.attach(app)
sources = {"youtube": YoutubeSource()}
admins = set()
admin_secrets = ["admin"]
client_secrets = ["test"]
clients = set()
class Queue(deque):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.num_of_entries_sem = asyncio.Semaphore(0)
self.readlock = asyncio.Lock()
async def append(self, item: Entry) -> None:
super().append(item)
await sio.emit("state", self.to_dict())
self.num_of_entries_sem.release()
async def popleft(self) -> Entry:
async with self.readlock:
await self.num_of_entries_sem.acquire()
item = super().popleft()
await sio.emit("state", self.to_dict())
return item
def to_dict(self) -> list[dict[str, Any]]:
return [item.to_dict() for item in self]
queue = Queue()
@sio.on("get-state")
async def handle_state(sid, data: dict[str, Any]):
await sio.emit("state", queue.to_dict(), room=sid)
@sio.on("append")
async def handle_append(sid, data: dict[str, Any]):
print(f"append: {data}")
source_obj = sources[data["source"]]
entry = await Entry.from_source(data["performer"], data["id"], source_obj)
await queue.append(entry)
print(f"new state: {queue.to_dict()}")
@sio.on("get-next")
async def handle_next(sid, data: dict[str, Any]):
if sid in clients:
print(f"get-next request from client {sid}")
current = await queue.popleft()
print(f"Sending {current} to client {sid}")
print(f"new state: {queue.to_dict()}")
await sio.emit("next", current.to_dict(), room=sid)
@sio.on("register-client")
async def handle_register_client(sid, data: dict[str, Any]):
if data["secret"] in client_secrets:
print(f"Registerd new client {sid}")
clients.add(sid)
await sio.emit("register-client", {"success": True}, room=sid)
else:
await sio.emit("register-client", {"success": False}, room=sid)
@sio.on("register-admin")
async def handle_register_admin(sid, data: dict[str, str]):
if data["secret"] in admin_secrets:
print(f"Registerd new admin {sid}")
admins.add(sid)
await sio.emit("register-admin", {"success": True}, room=sid)
else:
await sio.emit("register-admin", {"success": False}, room=sid)
@sio.on("skip")
async def handle_skip(sid, data={}):
if sid in admins:
for client in clients:
await sio.emit("skip", room=client)
@sio.on("search")
async def handle_search(sid, data: dict[str, str]):
print(f"Got search request from {sid}: {data}")
query = data["query"]
results = []
for source in sources.values():
results += await source.search(query)
print(f"Found {len(results)} results")
await sio.emit("search-results", [result.to_dict() for result in results], room=sid)
def main() -> None:
web.run_app(app, port=8080)
if __name__ == "__main__":
main()

2
syng/sources/__init__.py Normal file
View file

@ -0,0 +1,2 @@
from .youtube import YoutubeSource
from .source import Source

31
syng/sources/source.py Normal file
View file

@ -0,0 +1,31 @@
from __future__ import annotations
import asyncio
from typing import Callable, Awaitable
from ..entry import Entry
from ..result import Result
def async_in_thread(func: Callable) -> Awaitable:
async def wrapper(*args, **kwargs):
loop = asyncio.get_running_loop()
return await loop.run_in_executor(None, func, *args, **kwargs)
return wrapper
class Source:
async def get_entry(self, performer: str, ident: int | str) -> Entry:
pass
async def search(self, query: str) -> list[Result]:
pass
async def buffer(self, ident: int | str) -> None:
pass
async def play(self, ident: str) -> None:
pass
async def skip_current(self) -> None:
pass

114
syng/sources/youtube.py Normal file
View file

@ -0,0 +1,114 @@
import asyncio
import shlex
from functools import partial
from pytube import YouTube, Search, Channel, innertube
from mpv import MPV
from .source import Source, async_in_thread
from ..entry import Entry
from ..result import Result
class YoutubeSource(Source):
def __init__(self):
super().__init__()
self.innertube_client = innertube.InnerTube(client="WEB")
self.channels = ["/c/CCKaraoke"]
@async_in_thread
def play(self, ident: str) -> None:
self.player = MPV(
input_default_bindings=True,
input_vo_keyboard=True,
osc=True,
ytdl=True,
script_opts="ytdl_hook-ytdl_path=yt-dlp",
)
self.player.play(ident)
self.player.wait_for_playback()
async def skip_current(self) -> None:
loop = asyncio.get_event_loop()
await loop.run_in_executor(None, self.player.terminate)
@async_in_thread
def get_entry(self, performer: str, url: str) -> Entry:
yt = YouTube(url)
return Entry(
id=url,
source="youtube",
duration=yt.length,
title=yt.title,
artist=yt.author,
performer=performer,
)
def _contains_index(self, query, result):
compare_string = result.title.lower() + " " + result.author.lower()
hits = 0
queries = shlex.split(query.lower())
for word in queries:
if word in compare_string:
hits += 1
return 1 - (hits / len(queries))
@async_in_thread
def search(self, query: str) -> list[Result]:
for channel in self.channels:
results = self._channel_search(query, channel)
results += Search(query + " karaoke").results
results.sort(key=partial(self._contains_index, query))
return [
Result(
id=result.watch_url,
source="youtube",
title=result.title,
artist=result.author,
)
for result in results
]
def _channel_search(self, query, channel):
browseID = Channel(f"https://www.youtube.com{channel}").channel_id
endpoint = f"{self.innertube_client.base_url}/browse"
data = {"query": query, "browseId": browseID, "params": "EgZzZWFyY2g%3D"}
data.update(self.innertube_client.base_data)
results = self.innertube_client._call_api(
endpoint, self.innertube_client.base_params, data
)
items = results["contents"]["twoColumnBrowseResultsRenderer"]["tabs"][-1][
"expandableTabRenderer"
]["content"]["sectionListRenderer"]["contents"]
list_of_videos = []
for item in items:
try:
if (
"itemSectionRenderer" in item
and "videoRenderer" in item["itemSectionRenderer"]["contents"][0]
):
yt_url = (
"https://youtube.com/watch?v="
+ item["itemSectionRenderer"]["contents"][0]["videoRenderer"][
"videoId"
]
)
author = item["itemSectionRenderer"]["contents"][0][
"videoRenderer"
]["ownerText"]["runs"][0]["text"]
title = item["itemSectionRenderer"]["contents"][0]["videoRenderer"][
"title"
]["runs"][0]["text"]
yt = YouTube(yt_url)
yt.author = author
yt.title = title
list_of_videos.append(yt)
except KeyError:
pass
return list_of_videos

24
syng/test.py Normal file
View file

@ -0,0 +1,24 @@
import socketio
def append_yt(url):
sio = socketio.Client()
sio.connect("http://localhost:8080")
sio.emit("append", {"source": "youtube", "id": url, "performer": "test"})
sio.disconnect()
def skip():
sio = socketio.Client()
sio.connect("http://localhost:8080")
sio.emit("register-admin", {"secret": "admin"})
sio.emit("skip")
sio.disconnect()
def search(query):
sio = socketio.Client()
sio.on("search-result", print)
sio.connect("http://localhost:8080")
sio.emit("search", {"query": query})
sio.disconnect()