From 593ea380e3371225e1d2fb0cbfacc7612ba75ab5 Mon Sep 17 00:00:00 2001 From: Christoph Stahl Date: Mon, 14 Nov 2022 17:00:53 +0100 Subject: [PATCH] Per client room, persistant queue and queue loading from client --- syng/client.py | 16 ++- syng/server.py | 217 ++++++++++++++++++++++++++-------------- syng/webclientmockup.py | 36 +------ 3 files changed, 154 insertions(+), 115 deletions(-) diff --git a/syng/client.py b/syng/client.py index c5fcf06..c96995e 100644 --- a/syng/client.py +++ b/syng/client.py @@ -17,7 +17,7 @@ sources: dict[str, Source] = configure_sources(source_config) currentLock = asyncio.Semaphore(0) state = { "current": None, - "all_entries": {}, + "queue": [], } @@ -29,13 +29,19 @@ async def handle_skip(): @sio.on("state") async def handle_state(data): - state["all_entries"] = {entry["uuid"]: Entry(**entry) for entry in data} + state["queue"] = [Entry(**entry) for entry in data] @sio.on("connect") async def handle_connect(): print("Connected to server") - await sio.emit("register-client", {"secret": "test"}) + await sio.emit( + "register-client", + { + "secret": "test", + "queue": [entry.to_dict() for entry in state["queue"]], + }, + ) @sio.on("buffer") @@ -56,6 +62,7 @@ async def handle_play(data): await sources[entry.source].play(entry) except Exception: print_exc() + print("Finished, waiting for next") await sio.emit("pop-then-get-next") @@ -64,7 +71,8 @@ async def handle_register(data): if data["success"]: print("Registered") await sio.emit("sources", {"sources": list(source_config.keys())}) - await sio.emit("get-first") + if state["current"] is None: + await sio.emit("get-first") else: print("Registration failed") await sio.disconnect() diff --git a/syng/server.py b/syng/server.py index 186ca7a..b949908 100644 --- a/syng/server.py +++ b/syng/server.py @@ -3,6 +3,8 @@ from collections import deque from typing import Any import asyncio from dataclasses import dataclass +import string +import random from aiohttp import web import socketio @@ -15,25 +17,18 @@ app = web.Application() sio.attach(app) -@dataclass -class State: - admin_secret: str | None - sources: dict[str, Source] - sources_prio: list[str] - - -global_state = State(None, {}, []) +clients = {} class Queue(deque): - def __init__(self, *args, **kwargs): - super().__init__(*args, **kwargs) - self.num_of_entries_sem = asyncio.Semaphore(0) + def __init__(self, iterable=[]): + super().__init__(iterable) + + self.num_of_entries_sem = asyncio.Semaphore(len(iterable)) self.readlock = asyncio.Lock() async def append(self, item: Entry) -> None: super().append(item) - await sio.emit("state", self.to_dict()) self.num_of_entries_sem.release() async def peek(self) -> Entry: @@ -55,66 +50,112 @@ class Queue(deque): return [item.to_dict() for item in self] -queue = Queue() +@dataclass +class State: + secret: str | None + sources: dict[str, Source] + sources_prio: list[str] + queue: Queue + sid: str @sio.on("get-state") async def handle_state(sid, data: dict[str, Any] = {}): - await sio.emit("state", queue.to_dict(), room=sid) + async with sio.session(sid) as session: + short = session["short"] + state = clients[short] + + await sio.emit("state", state.queue.to_dict(), room=sid) @sio.on("append") async def handle_append(sid, data: dict[str, Any]): + async with sio.session(sid) as session: + short = session["short"] + state = clients[short] + print(f"append: {data}") - source_obj = global_state.sources[data["source"]] + source_obj = state.sources[data["source"]] entry = await Entry.from_source(data["performer"], data["id"], source_obj) - await queue.append(entry) - print(f"new state: {queue.to_dict()}") + await state.queue.append(entry) + await sio.emit("state", state.queue.to_dict(), room=short) await sio.emit( "buffer", entry.to_dict(), - room="clients", + room=clients[short].sid, ) @sio.on("meta-info") async def handle_meta_info(sid, data): async with sio.session(sid) as session: - if "client" in session and session["client"]: - for item in queue: - if str(item.uuid) == data["uuid"]: - item.update(**data["meta"]) + short = session["short"] + state = clients[short] - await sio.emit("state", queue.to_dict()) + for item in state.queue: + if str(item.uuid) == data["uuid"]: + item.update(**data["meta"]) + + await sio.emit("state", state.queue.to_dict(), room=short) @sio.on("get-first") async def handle_get_first(sid, data={}): async with sio.session(sid) as session: - if "client" in session and session["client"]: - current = await queue.peek() - print(f"Sending {current} to client {sid}") - await sio.emit("play", current.to_dict(), room=sid) + short = session["short"] + state = clients[short] + + current = await state.queue.peek() + + await sio.emit("play", current.to_dict(), room=sid) @sio.on("pop-then-get-next") async def handle_pop_then_get_next(sid, data={}): async with sio.session(sid) as session: - if "client" in session and session["client"]: - await queue.popleft() - current = await queue.peek() - print(f"Sending {current} to client {sid}") - await sio.emit("play", current.to_dict(), room=sid) + short = session["short"] + state = clients[short] + + await state.queue.popleft() + current = await state.queue.peek() + + await sio.emit("state", state.queue.to_dict(), room=short) + await sio.emit("play", current.to_dict(), room=sid) + + +def gen_id(length=4) -> str: + client_id = "".join([random.choice(string.ascii_letters) for _ in range(length)]) + if client_id in clients: + client_id = gen_id(length + 1) + return client_id @sio.on("register-client") async def handle_register_client(sid, data: dict[str, Any]): print(f"Registerd new client {sid}") - global_state.admin_secret = data["secret"] - await sio.save_session(sid, {"client": True}) - sio.enter_room(sid, "clients") - await sio.emit("client-registered", {"success": True}, room=sid) + short = data["short"] if "short" in data else gen_id() + async with sio.session(sid) as session: + session["short"] = short + + print(f"short id: {short}") + if data["short"] in clients: + old_state = clients[short] + if data["secret"] == old_state.secret: + old_state.sid = sid + sio.enter_room(sid, short) + await sio.emit( + "client-registered", {"success": True, "short": short}, room=sid + ) + else: + await sio.emit( + "client-registered", {"success": False, "short": short}, room=sid + ) + else: + initial_entries = [Entry(**entry) for entry in data["queue"]] + clients[short] = State(data["secret"], {}, [], Queue(initial_entries), sid) + sio.enter_room(sid, short) + await sio.emit("client-registered", {"success": True, "short": short}, room=sid) @sio.on("sources") @@ -125,87 +166,109 @@ async def handle_sources(sid, data): sources and query for a config for all uninitialized sources """ async with sio.session(sid) as session: - if "client" in session and session["client"]: - unused_sources = global_state.sources.keys() - data["sources"] - new_sources = data["sources"] - global_state.sources.keys() + short = session["short"] + state = clients[short] - for source in unused_sources: - del global_state.sources[source] + unused_sources = state.sources.keys() - data["sources"] + new_sources = data["sources"] - state.sources.keys() - global_state.sources_prio = data["sources"] + for source in unused_sources: + del state.sources[source] - for name in new_sources: - await sio.emit("request-config", {"source": name}, room=sid) + state.sources_prio = data["sources"] + + for name in new_sources: + await sio.emit("request-config", {"source": name}, room=sid) @sio.on("config-chunk") async def handle_config_chung(sid, data): async with sio.session(sid) as session: - if "client" in session and session["client"]: - if not data["source"] in global_state.sources: - global_state.sources[data["source"]] = available_sources[ - data["source"] - ](data["config"]) - else: - global_state.sources[data["source"]].add_to_config(data["config"]) + short = session["short"] + state = clients[short] + + if not data["source"] in state.sources: + state.sources[data["source"]] = available_sources[data["source"]]( + data["config"] + ) + else: + state.sources[data["source"]].add_to_config(data["config"]) @sio.on("config") async def handle_config(sid, data): async with sio.session(sid) as session: - if "client" in session and session["client"]: - global_state.sources[data["source"]] = available_sources[data["source"]]( - data["config"] - ) - print(f"Added source {data['source']}") + short = session["short"] + state = clients[short] + + state.sources[data["source"]] = available_sources[data["source"]](data["config"]) + print(f"Added source {data['source']}") + + +@sio.on("register-web") +async def handle_register_web(sid, data): + async with sio.session(sid) as session: + session["short"] = data["short"] + sio.enter_room(sid, session["short"]) + state = clients[session["short"]] + + await sio.emit("state", state.queue.to_dict(), room=sid) @sio.on("register-admin") async def handle_register_admin(sid, data: dict[str, str]): - if global_state.admin_secret and data["secret"] in global_state.admin_secret: - print(f"Registerd new admin {sid}") - await sio.save_session(sid, {"admin": True}) - await sio.emit("register-admin", {"success": True}, room=sid) - else: - await sio.emit("register-admin", {"success": False}, room=sid) + async with sio.session(sid) as session: + short = session["short"] + state = clients[short] + + is_admin = data["secret"] == state.secret + async with sio.session(sid) as session: + session["admin"] = is_admin + await sio.emit("register-admin", {"success": is_admin}, room=sid) @sio.on("get-config") async def handle_get_config(sid, data): async with sio.session(sid) as session: - if "admin" in session and session["admin"]: - await sio.emit( - "config", - { - name: source.get_config() - for name, source in global_state.sources.items() - }, - ) + short = session["short"] + is_admin = session["admin"] + state = clients[short] + + if is_admin: + await sio.emit( + "config", + {name: source.get_config() for name, source in state.sources.items()}, + ) @sio.on("skip") async def handle_skip(sid, data={}): async with sio.session(sid) as session: - if "admin" in session and session["admin"]: - await sio.emit("skip", room="clients") + short = session["short"] + is_admin = session["admin"] + + if is_admin: + await sio.emit("skip", room=clients[short].sid) @sio.on("disconnect") async def handle_disconnect(sid, data={}): async with sio.session(sid) as session: - if "client" in session and session["client"]: - sio.leave_room(sid, "clients") + sio.leave_room(sid, session["short"]) @sio.on("search") async def handle_search(sid, data: dict[str, str]): - print(f"Got search request from {sid}: {data}") + async with sio.session(sid) as session: + short = session["short"] + state = clients[short] + query = data["query"] result_futures = [] - for source in global_state.sources_prio: + for source in state.sources_prio: loop = asyncio.get_running_loop() search_future = loop.create_future() - loop.create_task(global_state.sources[source].search(search_future, query)) + loop.create_task(state.sources[source].search(search_future, query)) result_futures.append(search_future) results = [ diff --git a/syng/webclientmockup.py b/syng/webclientmockup.py index 2917106..a0112e5 100644 --- a/syng/webclientmockup.py +++ b/syng/webclientmockup.py @@ -26,39 +26,6 @@ async def handle_state(data): @sio.on("connect") async def handle_connect(): print("Connected") - # await sio.emit("search", {"query": "Linkin Park"}) - # await sio.emit( - # "append", - # { - # "performer": "Hammy", - # "source": "youtube", - # "id": "https://www.youtube.com/watch?v=rqZqHXJm-UA", # https://youtube.com/watch?v=x5bM5Bdizi4", - # }, - # ) - # await sio.emit( - # "append", - # { - # "performer": "Hammy", - # "source": "s3", - # "id": "Sunfly Gold/SFGD034 - Linkin Park & Limp Bizkit/Linkin Park - Pushing Me Away - Sunfly Gold 34.cdg", - # }, - # ) - # await sio.emit( - # "append", - # { - # "performer": "Hammy", - # "source": "s3", - # "id": "Sunfly Gold/SFGD034 - Linkin Park & Limp Bizkit/Linkin Park - Pushing Me Away - Sunfly Gold 34.cdg", - # }, - # ) - # await sio.emit( - # "append", - # { - # "performer": "Hammy", - # "source": "s3", - # "id": "Sunfly Gold/SFGD034 - Linkin Park & Limp Bizkit/Linkin Park - Pushing Me Away - Sunfly Gold 34.cdg", - # }, - # ) @sio.on("register-admin") @@ -94,8 +61,9 @@ class SyngShell(aiocmd.PromptToolkitCmd): async def do_admin(self, data): await sio.emit("register-admin", {"secret": data}) - async def do_connect(self): + async def do_connect(self, short): await sio.connect("http://127.0.0.1:8080") + await sio.emit("register-web", {"short": short}) async def do_skip(self): await sio.emit("skip")