Per client room, persistant queue and queue loading from client

This commit is contained in:
Christoph Stahl 2022-11-14 17:00:53 +01:00
parent 24e3b0fde2
commit 593ea380e3
3 changed files with 154 additions and 115 deletions

View file

@ -17,7 +17,7 @@ sources: dict[str, Source] = configure_sources(source_config)
currentLock = asyncio.Semaphore(0) currentLock = asyncio.Semaphore(0)
state = { state = {
"current": None, "current": None,
"all_entries": {}, "queue": [],
} }
@ -29,13 +29,19 @@ async def handle_skip():
@sio.on("state") @sio.on("state")
async def handle_state(data): async def handle_state(data):
state["all_entries"] = {entry["uuid"]: Entry(**entry) for entry in data} state["queue"] = [Entry(**entry) for entry in data]
@sio.on("connect") @sio.on("connect")
async def handle_connect(): async def handle_connect():
print("Connected to server") print("Connected to server")
await sio.emit("register-client", {"secret": "test"}) await sio.emit(
"register-client",
{
"secret": "test",
"queue": [entry.to_dict() for entry in state["queue"]],
},
)
@sio.on("buffer") @sio.on("buffer")
@ -56,6 +62,7 @@ async def handle_play(data):
await sources[entry.source].play(entry) await sources[entry.source].play(entry)
except Exception: except Exception:
print_exc() print_exc()
print("Finished, waiting for next")
await sio.emit("pop-then-get-next") await sio.emit("pop-then-get-next")
@ -64,7 +71,8 @@ async def handle_register(data):
if data["success"]: if data["success"]:
print("Registered") print("Registered")
await sio.emit("sources", {"sources": list(source_config.keys())}) await sio.emit("sources", {"sources": list(source_config.keys())})
await sio.emit("get-first") if state["current"] is None:
await sio.emit("get-first")
else: else:
print("Registration failed") print("Registration failed")
await sio.disconnect() await sio.disconnect()

View file

@ -3,6 +3,8 @@ from collections import deque
from typing import Any from typing import Any
import asyncio import asyncio
from dataclasses import dataclass from dataclasses import dataclass
import string
import random
from aiohttp import web from aiohttp import web
import socketio import socketio
@ -15,25 +17,18 @@ app = web.Application()
sio.attach(app) sio.attach(app)
@dataclass clients = {}
class State:
admin_secret: str | None
sources: dict[str, Source]
sources_prio: list[str]
global_state = State(None, {}, [])
class Queue(deque): class Queue(deque):
def __init__(self, *args, **kwargs): def __init__(self, iterable=[]):
super().__init__(*args, **kwargs) super().__init__(iterable)
self.num_of_entries_sem = asyncio.Semaphore(0)
self.num_of_entries_sem = asyncio.Semaphore(len(iterable))
self.readlock = asyncio.Lock() self.readlock = asyncio.Lock()
async def append(self, item: Entry) -> None: async def append(self, item: Entry) -> None:
super().append(item) super().append(item)
await sio.emit("state", self.to_dict())
self.num_of_entries_sem.release() self.num_of_entries_sem.release()
async def peek(self) -> Entry: async def peek(self) -> Entry:
@ -55,66 +50,112 @@ class Queue(deque):
return [item.to_dict() for item in self] return [item.to_dict() for item in self]
queue = Queue() @dataclass
class State:
secret: str | None
sources: dict[str, Source]
sources_prio: list[str]
queue: Queue
sid: str
@sio.on("get-state") @sio.on("get-state")
async def handle_state(sid, data: dict[str, Any] = {}): async def handle_state(sid, data: dict[str, Any] = {}):
await sio.emit("state", queue.to_dict(), room=sid) async with sio.session(sid) as session:
short = session["short"]
state = clients[short]
await sio.emit("state", state.queue.to_dict(), room=sid)
@sio.on("append") @sio.on("append")
async def handle_append(sid, data: dict[str, Any]): async def handle_append(sid, data: dict[str, Any]):
async with sio.session(sid) as session:
short = session["short"]
state = clients[short]
print(f"append: {data}") print(f"append: {data}")
source_obj = global_state.sources[data["source"]] source_obj = state.sources[data["source"]]
entry = await Entry.from_source(data["performer"], data["id"], source_obj) entry = await Entry.from_source(data["performer"], data["id"], source_obj)
await queue.append(entry) await state.queue.append(entry)
print(f"new state: {queue.to_dict()}") await sio.emit("state", state.queue.to_dict(), room=short)
await sio.emit( await sio.emit(
"buffer", "buffer",
entry.to_dict(), entry.to_dict(),
room="clients", room=clients[short].sid,
) )
@sio.on("meta-info") @sio.on("meta-info")
async def handle_meta_info(sid, data): async def handle_meta_info(sid, data):
async with sio.session(sid) as session: async with sio.session(sid) as session:
if "client" in session and session["client"]: short = session["short"]
for item in queue: state = clients[short]
if str(item.uuid) == data["uuid"]:
item.update(**data["meta"])
await sio.emit("state", queue.to_dict()) for item in state.queue:
if str(item.uuid) == data["uuid"]:
item.update(**data["meta"])
await sio.emit("state", state.queue.to_dict(), room=short)
@sio.on("get-first") @sio.on("get-first")
async def handle_get_first(sid, data={}): async def handle_get_first(sid, data={}):
async with sio.session(sid) as session: async with sio.session(sid) as session:
if "client" in session and session["client"]: short = session["short"]
current = await queue.peek() state = clients[short]
print(f"Sending {current} to client {sid}")
await sio.emit("play", current.to_dict(), room=sid) current = await state.queue.peek()
await sio.emit("play", current.to_dict(), room=sid)
@sio.on("pop-then-get-next") @sio.on("pop-then-get-next")
async def handle_pop_then_get_next(sid, data={}): async def handle_pop_then_get_next(sid, data={}):
async with sio.session(sid) as session: async with sio.session(sid) as session:
if "client" in session and session["client"]: short = session["short"]
await queue.popleft() state = clients[short]
current = await queue.peek()
print(f"Sending {current} to client {sid}") await state.queue.popleft()
await sio.emit("play", current.to_dict(), room=sid) current = await state.queue.peek()
await sio.emit("state", state.queue.to_dict(), room=short)
await sio.emit("play", current.to_dict(), room=sid)
def gen_id(length=4) -> str:
client_id = "".join([random.choice(string.ascii_letters) for _ in range(length)])
if client_id in clients:
client_id = gen_id(length + 1)
return client_id
@sio.on("register-client") @sio.on("register-client")
async def handle_register_client(sid, data: dict[str, Any]): async def handle_register_client(sid, data: dict[str, Any]):
print(f"Registerd new client {sid}") print(f"Registerd new client {sid}")
global_state.admin_secret = data["secret"] short = data["short"] if "short" in data else gen_id()
await sio.save_session(sid, {"client": True}) async with sio.session(sid) as session:
sio.enter_room(sid, "clients") session["short"] = short
await sio.emit("client-registered", {"success": True}, room=sid)
print(f"short id: {short}")
if data["short"] in clients:
old_state = clients[short]
if data["secret"] == old_state.secret:
old_state.sid = sid
sio.enter_room(sid, short)
await sio.emit(
"client-registered", {"success": True, "short": short}, room=sid
)
else:
await sio.emit(
"client-registered", {"success": False, "short": short}, room=sid
)
else:
initial_entries = [Entry(**entry) for entry in data["queue"]]
clients[short] = State(data["secret"], {}, [], Queue(initial_entries), sid)
sio.enter_room(sid, short)
await sio.emit("client-registered", {"success": True, "short": short}, room=sid)
@sio.on("sources") @sio.on("sources")
@ -125,87 +166,109 @@ async def handle_sources(sid, data):
sources and query for a config for all uninitialized sources sources and query for a config for all uninitialized sources
""" """
async with sio.session(sid) as session: async with sio.session(sid) as session:
if "client" in session and session["client"]: short = session["short"]
unused_sources = global_state.sources.keys() - data["sources"] state = clients[short]
new_sources = data["sources"] - global_state.sources.keys()
for source in unused_sources: unused_sources = state.sources.keys() - data["sources"]
del global_state.sources[source] new_sources = data["sources"] - state.sources.keys()
global_state.sources_prio = data["sources"] for source in unused_sources:
del state.sources[source]
for name in new_sources: state.sources_prio = data["sources"]
await sio.emit("request-config", {"source": name}, room=sid)
for name in new_sources:
await sio.emit("request-config", {"source": name}, room=sid)
@sio.on("config-chunk") @sio.on("config-chunk")
async def handle_config_chung(sid, data): async def handle_config_chung(sid, data):
async with sio.session(sid) as session: async with sio.session(sid) as session:
if "client" in session and session["client"]: short = session["short"]
if not data["source"] in global_state.sources: state = clients[short]
global_state.sources[data["source"]] = available_sources[
data["source"] if not data["source"] in state.sources:
](data["config"]) state.sources[data["source"]] = available_sources[data["source"]](
else: data["config"]
global_state.sources[data["source"]].add_to_config(data["config"]) )
else:
state.sources[data["source"]].add_to_config(data["config"])
@sio.on("config") @sio.on("config")
async def handle_config(sid, data): async def handle_config(sid, data):
async with sio.session(sid) as session: async with sio.session(sid) as session:
if "client" in session and session["client"]: short = session["short"]
global_state.sources[data["source"]] = available_sources[data["source"]]( state = clients[short]
data["config"]
) state.sources[data["source"]] = available_sources[data["source"]](data["config"])
print(f"Added source {data['source']}") print(f"Added source {data['source']}")
@sio.on("register-web")
async def handle_register_web(sid, data):
async with sio.session(sid) as session:
session["short"] = data["short"]
sio.enter_room(sid, session["short"])
state = clients[session["short"]]
await sio.emit("state", state.queue.to_dict(), room=sid)
@sio.on("register-admin") @sio.on("register-admin")
async def handle_register_admin(sid, data: dict[str, str]): async def handle_register_admin(sid, data: dict[str, str]):
if global_state.admin_secret and data["secret"] in global_state.admin_secret: async with sio.session(sid) as session:
print(f"Registerd new admin {sid}") short = session["short"]
await sio.save_session(sid, {"admin": True}) state = clients[short]
await sio.emit("register-admin", {"success": True}, room=sid)
else: is_admin = data["secret"] == state.secret
await sio.emit("register-admin", {"success": False}, room=sid) async with sio.session(sid) as session:
session["admin"] = is_admin
await sio.emit("register-admin", {"success": is_admin}, room=sid)
@sio.on("get-config") @sio.on("get-config")
async def handle_get_config(sid, data): async def handle_get_config(sid, data):
async with sio.session(sid) as session: async with sio.session(sid) as session:
if "admin" in session and session["admin"]: short = session["short"]
await sio.emit( is_admin = session["admin"]
"config", state = clients[short]
{
name: source.get_config() if is_admin:
for name, source in global_state.sources.items() await sio.emit(
}, "config",
) {name: source.get_config() for name, source in state.sources.items()},
)
@sio.on("skip") @sio.on("skip")
async def handle_skip(sid, data={}): async def handle_skip(sid, data={}):
async with sio.session(sid) as session: async with sio.session(sid) as session:
if "admin" in session and session["admin"]: short = session["short"]
await sio.emit("skip", room="clients") is_admin = session["admin"]
if is_admin:
await sio.emit("skip", room=clients[short].sid)
@sio.on("disconnect") @sio.on("disconnect")
async def handle_disconnect(sid, data={}): async def handle_disconnect(sid, data={}):
async with sio.session(sid) as session: async with sio.session(sid) as session:
if "client" in session and session["client"]: sio.leave_room(sid, session["short"])
sio.leave_room(sid, "clients")
@sio.on("search") @sio.on("search")
async def handle_search(sid, data: dict[str, str]): async def handle_search(sid, data: dict[str, str]):
print(f"Got search request from {sid}: {data}") async with sio.session(sid) as session:
short = session["short"]
state = clients[short]
query = data["query"] query = data["query"]
result_futures = [] result_futures = []
for source in global_state.sources_prio: for source in state.sources_prio:
loop = asyncio.get_running_loop() loop = asyncio.get_running_loop()
search_future = loop.create_future() search_future = loop.create_future()
loop.create_task(global_state.sources[source].search(search_future, query)) loop.create_task(state.sources[source].search(search_future, query))
result_futures.append(search_future) result_futures.append(search_future)
results = [ results = [

View file

@ -26,39 +26,6 @@ async def handle_state(data):
@sio.on("connect") @sio.on("connect")
async def handle_connect(): async def handle_connect():
print("Connected") print("Connected")
# await sio.emit("search", {"query": "Linkin Park"})
# await sio.emit(
# "append",
# {
# "performer": "Hammy",
# "source": "youtube",
# "id": "https://www.youtube.com/watch?v=rqZqHXJm-UA", # https://youtube.com/watch?v=x5bM5Bdizi4",
# },
# )
# await sio.emit(
# "append",
# {
# "performer": "Hammy",
# "source": "s3",
# "id": "Sunfly Gold/SFGD034 - Linkin Park & Limp Bizkit/Linkin Park - Pushing Me Away - Sunfly Gold 34.cdg",
# },
# )
# await sio.emit(
# "append",
# {
# "performer": "Hammy",
# "source": "s3",
# "id": "Sunfly Gold/SFGD034 - Linkin Park & Limp Bizkit/Linkin Park - Pushing Me Away - Sunfly Gold 34.cdg",
# },
# )
# await sio.emit(
# "append",
# {
# "performer": "Hammy",
# "source": "s3",
# "id": "Sunfly Gold/SFGD034 - Linkin Park & Limp Bizkit/Linkin Park - Pushing Me Away - Sunfly Gold 34.cdg",
# },
# )
@sio.on("register-admin") @sio.on("register-admin")
@ -94,8 +61,9 @@ class SyngShell(aiocmd.PromptToolkitCmd):
async def do_admin(self, data): async def do_admin(self, data):
await sio.emit("register-admin", {"secret": data}) await sio.emit("register-admin", {"secret": data})
async def do_connect(self): async def do_connect(self, short):
await sio.connect("http://127.0.0.1:8080") await sio.connect("http://127.0.0.1:8080")
await sio.emit("register-web", {"short": short})
async def do_skip(self): async def do_skip(self):
await sio.emit("skip") await sio.emit("skip")