From 8900251b1e3cb9b86abda6e5d2cd455c3db13539 Mon Sep 17 00:00:00 2001 From: Christoph Stahl Date: Mon, 14 Oct 2024 13:02:12 +0200 Subject: [PATCH] put server in an own class --- syng/server.py | 2129 ++++++++++++++++----------------- typings/socketio/__init__.pyi | 4 +- 2 files changed, 1062 insertions(+), 1071 deletions(-) diff --git a/syng/server.py b/syng/server.py index 732b7c3..50d9f65 100644 --- a/syng/server.py +++ b/syng/server.py @@ -41,12 +41,6 @@ from .queue import Queue from .sources import available_sources from .sources import Source -sio = socketio.AsyncServer( - cors_allowed_origins="*", logger=True, engineio_logger=False, json=jsonencoder -) -app = web.Application() -sio.attach(app) - DEFAULT_CONFIG = { "preview_duration": 3, "waiting_room_policy": None, @@ -54,26 +48,6 @@ DEFAULT_CONFIG = { } -async def root_handler(request: Any) -> Any: - """ - Handle the index and favicon requests. - - If the path of the request ends with "/favicon.ico" return the favicon, - otherwise the index.html. This way the javascript can read the room code - from the url. - - :param request Any: Webrequest from aiohttp - :return: Either the favicon or the index.html - :rtype web.FileResponse: - """ - if request.path.endswith("/favicon.ico"): - return web.FileResponse(os.path.join(app["root_folder"], "favicon.ico")) - return web.FileResponse(os.path.join(app["root_folder"], "index.html")) - - -# logger = logging.getLogger(__name__) - - @dataclass class Client: """This stores the configuration of a specific playback client. @@ -140,1136 +114,1151 @@ class State: last_seen: datetime.datetime = field(init=False, default_factory=datetime.datetime.now) -clients: dict[str, State] = {} +class Server: + def __init__(self) -> None: + self.sio = socketio.AsyncServer( + cors_allowed_origins="*", logger=True, engineio_logger=False, json=jsonencoder + ) + self.app = web.Application() + self.clients: dict[str, State] = {} + self.sio.attach(self.app) + self.register_handlers() + def register_handlers(self) -> None: + self.sio.on("get-state", self.handle_get_state) + self.sio.on("waiting-room-append", self.handle_waiting_room_append) + self.sio.on("show-config", self.handle_show_config) + self.sio.on("update-config", self.handle_update_config) + self.sio.on("append", self.handle_append) + self.sio.on("append-anyway", self.handle_append_anyway) + self.sio.on("meta-info", self.handle_meta_info) + self.sio.on("get-first", self.handle_get_first) + self.sio.on("waiting-room-to-queue", self.handle_waiting_room_to_queue) + self.sio.on("pop-then-get-next", self.handle_pop_then_get_next) + self.sio.on("register-client", self.handle_register_client) + self.sio.on("sources", self.handle_sources) + self.sio.on("config-chunk", self.handle_config_chunk) + self.sio.on("config", self.handle_config) + self.sio.on("register-web", self.handle_register_web) + self.sio.on("register-admin", self.handle_register_admin) + self.sio.on("skip-current", self.handle_skip_current) + self.sio.on("move-to", self.handle_move_to) + self.sio.on("move-up", self.handle_move_up) + self.sio.on("skip", self.handle_skip) + self.sio.on("disconnect", self.handle_disconnect) + self.sio.on("search", self.handle_search) + self.sio.on("search-results", self.handle_search_results) -async def send_state(state: State, sid: str) -> None: - """ - Send the current state (queue and recent-list) to sid. + async def root_handler(self, request: Any) -> Any: + """ + Handle the index and favicon requests. - This sends a "state" message. This can be received either by the playback - client, a web client or the whole room. + If the path of the request ends with "/favicon.ico" return the favicon, + otherwise the index.html. This way the javascript can read the room code + from the url. - If it is send to a playback client, it will be handled by the - :py:func:`syng.client.handle_state` function. + :param request Any: Webrequest from aiohttp + :return: Either the favicon or the index.html + :rtype web.FileResponse: + """ + if request.path.endswith("/favicon.ico"): + return web.FileResponse(os.path.join(self.app["root_folder"], "favicon.ico")) + return web.FileResponse(os.path.join(self.app["root_folder"], "index.html")) - :param state: The state to send - :type state: State - :param sid: The recepient of the "state" message - :type sid: str: - :rtype: None - """ + async def send_state(self, state: State, sid: str) -> None: + """ + Send the current state (queue and recent-list) to sid. - safe_config = {k: v for k, v in state.client.config.items() if k not in ["secret", "key"]} + This sends a "state" message. This can be received either by the playback + client, a web client or the whole room. - await sio.emit( - "state", - { - "queue": state.queue, - "recent": state.recent, - "waiting_room": state.waiting_room, - "config": safe_config, - }, - room=sid, - ) + If it is send to a playback client, it will be handled by the + :py:func:`syng.client.handle_state` function. + :param state: The state to send + :type state: State + :param sid: The recepient of the "state" message + :type sid: str: + :rtype: None + """ -@sio.on("get-state") -async def handle_state(sid: str) -> None: - """ - Handle the "get-state" message. + safe_config = {k: v for k, v in state.client.config.items() if k not in ["secret", "key"]} - Sends the current state to whoever requests it. This failes if the sender - is not part of any room. - - :param sid: The initial sender, and therefore recepient of the "state" - message - :type sid: str - :rtype: None - """ - async with sio.session(sid) as session: - room = session["room"] - state = clients[room] - - await send_state(state, sid) - - -@sio.on("waiting-room-append") -async def handle_waiting_room_append(sid: str, data: dict[str, Any]) -> None: - """ - Append a song to the waiting room. - - This should be called from a web client. Appends the entry, that is encoded - within the data to the waiting room of the room the client is currently - connected to. - - :param sid: The session id of the client sending this request - :type sid: str - :param data: A dictionary encoding the entry, that should be added to the - waiting room. - :type data: dict[str, Any] - :rtype: None - """ - async with sio.session(sid) as session: - room = session["room"] - state = clients[room] - - source_obj = state.client.sources[data["source"]] - - entry = await source_obj.get_entry(data["performer"], data["ident"]) - - if entry is None: - await sio.emit( - "msg", - {"msg": f"Unable to add to the waiting room: {data['ident']}. Maybe try again?"}, + await self.sio.emit( + "state", + { + "queue": state.queue, + "recent": state.recent, + "waiting_room": state.waiting_room, + "config": safe_config, + }, room=sid, ) - return - if "uid" not in data or ( - (data["uid"] is not None and len(list(state.queue.find_by_uid(data["uid"]))) == 0) - or (data["uid"] is None and state.queue.find_by_name(data["performer"]) is None) - ): - await append_to_queue(room, entry, sid) - return + async def handle_get_state(self, sid: str) -> None: + """ + Handle the "get-state" message. - entry.uid = data["uid"] + Sends the current state to whoever requests it. This failes if the sender + is not part of any room. - state.waiting_room.append(entry) - await send_state(state, room) - await sio.emit( - "get-meta-info", - entry, - room=clients[room].sid, - ) + :param sid: The initial sender, and therefore recepient of the "state" + message + :type sid: str + :rtype: None + """ + async with self.sio.session(sid) as session: + room = session["room"] + state = self.clients[room] + await self.send_state(state, sid) -async def append_to_queue(room: str, entry: Entry, report_to: Optional[str] = None) -> None: - """ - Append a song to the queue for a given session. + async def handle_waiting_room_append(self, sid: str, data: dict[str, Any]) -> None: + """ + Append a song to the waiting room. - Checks, if the computed start time is before the configured end time of the - event, and reports an error, if the end time is exceeded. + This should be called from a web client. Appends the entry, that is encoded + within the data to the waiting room of the room the client is currently + connected to. - :param room: The room with the queue. - :type room: str - :param entry: The entry that contains the song. - :type entry: Entry - :param report_to: If an error occurs, who to report to. - :type report_to: Optional[str] - :rtype: None - """ - state = clients[room] + :param sid: The session id of the client sending this request + :type sid: str + :param data: A dictionary encoding the entry, that should be added to the + waiting room. + :type data: dict[str, Any] + :rtype: None + """ + async with self.sio.session(sid) as session: + room = session["room"] + state = self.clients[room] - first_song = state.queue.try_peek() - if first_song is None or first_song.started_at is None: - start_time = datetime.datetime.now().timestamp() - else: - start_time = first_song.started_at + source_obj = state.client.sources[data["source"]] - start_time = state.queue.fold( - lambda item, time: time + item.duration + state.client.config["preview_duration"] + 1, - start_time, - ) + entry = await source_obj.get_entry(data["performer"], data["ident"]) - if state.client.config["last_song"]: - if state.client.config["last_song"] < start_time: - # end_time = datetime.datetime.fromtimestamp( - # state.client.config["last_song"] - # ) - if report_to is not None: - await sio.emit( - "err", - { - "type": "QUEUE_FULL", - "end_time": state.client.config["last_song"], - }, - room=report_to, - ) - return - - state.queue.append(entry) - await send_state(state, room) - - await sio.emit( - "get-meta-info", - entry, - room=clients[room].sid, - ) - - -@sio.on("show_config") -async def handle_show_config(sid: str) -> None: - """ - Sends public config to webclient. - - This will only be send if the client is on an admin connection. - - :param sid: The session id of the client sending this request - :type sid: str - :rtype: None - """ - - async with sio.session(sid) as session: - room = session["room"] - is_admin = session["admin"] - state = clients[room] - - if is_admin: - await sio.emit( - "config", - state.client.config, - sid, - ) - else: - await sio.emit("err", {"type": "NO_ADMIN"}, sid) - - -@sio.on("update_config") -async def handle_update_config(sid: str, data: dict[str, Any]) -> None: - """ - Forwards an updated config from an authorized webclient to the playback client. - - This is currently untrested and should be used with caution. - - :param sid: The session id of the client sending this request - :type sid: str - :param data: A dictionary encoding the new configuration - :type data: dict[str, Any] - :rtype: None - """ - async with sio.session(sid) as session: - room = session["room"] - is_admin = session["admin"] - state = clients[room] - - if is_admin: - try: - config = jsonencoder.loads(data["config"]) - await sio.emit( - "update_config", - DEFAULT_CONFIG | config, - state.sid, - ) - state.client.config = DEFAULT_CONFIG | config - await sio.emit("update_config", config, room) - except JSONDecodeError: - await sio.emit("err", {"type": "JSON_MALFORMED"}) - - else: - await sio.emit("err", {"type": "NO_ADMIN"}, sid) - - -@sio.on("append") -async def handle_append(sid: str, data: dict[str, Any]) -> None: - """ - Handle the "append" message. - - This should be called from a web client. Appends the entry, that is encoded - within the data to the room the client is currently connected to. An entry - constructed this way, will be given a UUID, to differentiate it from other - entries for the same song. Additionally an id of the web client is saved - for that entry. - - If the room is configured to no longer accept songs past a certain time - (via the :py:attr:`Config.last_song` attribute), it is checked, if the - start time of the song would exceed this time. If this is the case, the - request is denied and a "msg" message is send to the client, detailing - this. - - If a waitingroom is forced or optional, it is checked, if one of the performers is - already in queue. In that case, a "ask_for_waitingroom" message is send to the - client. - - Otherwise the song is added to the queue. And all connected clients (web - and playback client) are informed of the new state with a "state" message. - - Since some properties of a song can only be accessed on the playback - client, a "get-meta-info" message is send to the playback client. This is - handled there with the :py:func:`syng.client.handle_get_meta_info` - function. - - :param sid: The session id of the client sending this request - :type sid: str - :param data: A dictionary encoding the entry, that should be added to the - queue. - :type data: dict[str, Any] - :rtype: None - """ - async with sio.session(sid) as session: - room = session["room"] - state = clients[room] - - if len(data["performer"]) > 50: - await sio.emit("err", {"type": "NAME_LENGTH", "name": data["performer"]}, room=sid) - return - - if predict([data["performer"]]) == [1]: - await sio.emit("err", {"type": "PROFANITY", "name": data["performer"]}, room=sid) - return - - if state.client.config["waiting_room_policy"] and ( - state.client.config["waiting_room_policy"].lower() == "forced" - or state.client.config["waiting_room_policy"].lower() == "optional" - ): - old_entry = state.queue.find_by_name(data["performer"]) - if old_entry is not None: - await sio.emit( - "ask_for_waitingroom", - { - "current_entry": { - "source": data["source"], - "performer": data["performer"], - "ident": data["ident"], - }, - "old_entry": { - "artist": old_entry.artist, - "title": old_entry.title, - "performer": old_entry.performer, - }, - }, + if entry is None: + await self.sio.emit( + "msg", + {"msg": f"Unable to add to the waiting room: {data['ident']}. Maybe try again?"}, room=sid, ) return - source_obj = state.client.sources[data["source"]] + if "uid" not in data or ( + (data["uid"] is not None and len(list(state.queue.find_by_uid(data["uid"]))) == 0) + or (data["uid"] is None and state.queue.find_by_name(data["performer"]) is None) + ): + await self.append_to_queue(room, entry, sid) + return - entry = await source_obj.get_entry(data["performer"], data["ident"]) + entry.uid = data["uid"] - if entry is None: - await sio.emit( - "msg", - {"msg": f"Unable to append {data['ident']}. Maybe try again?"}, - room=sid, + state.waiting_room.append(entry) + await self.send_state(state, room) + await self.sio.emit( + "get-meta-info", + entry, + room=self.clients[room].sid, ) - return - entry.uid = data["uid"] if "uid" in data else None - - await append_to_queue(room, entry, sid) - - -@sio.on("append-anyway") -async def handle_append_anyway(sid: str, data: dict[str, Any]) -> None: - """ - Appends a song to the queue, even if the performer is already in queue. - - Works the same as handle_append, but without the check if the performer is already - in queue. - - Only if the waiting_room_policy is not configured as forced. - """ - async with sio.session(sid) as session: - room = session["room"] - state = clients[room] - - if len(data["performer"]) > 50: - await sio.emit("err", {"type": "NAME_LENGTH", "name": data["performer"]}, room=sid) - return - - if predict([data["performer"]]) == [1]: - await sio.emit("err", {"type": "PROFANITY", "name": data["performer"]}, room=sid) - return - - if state.client.config["waiting_room_policy"].lower() == "forced": - await sio.emit( - "err", - {"type": "WAITING_ROOM_FORCED"}, - room=sid, - ) - return - - source_obj = state.client.sources[data["source"]] - - entry = await source_obj.get_entry(data["performer"], data["ident"]) - - if entry is None: - await sio.emit( - "msg", - {"msg": f"Unable to append {data['ident']}. Maybe try again?"}, - room=sid, - ) - return - - entry.uid = data["uid"] if "uid" in data else None - - await append_to_queue(room, entry, sid) - - -@sio.on("meta-info") -async def handle_meta_info(sid: str, data: dict[str, Any]) -> None: - """ - Handle the "meta-info" message. - - Updated a :py:class:syng.entry.Entry`, that is encoded in the data - parameter, in the queue, that belongs to the room the requesting client - belongs to, with new meta data, that is send from the playback client. - - Afterwards send the updated queue to all members of the room. - - :param sid: The session id of the client sending this request. - :type sid: str - :param data: A dictionary encoding the entry to update (already with the - new metadata) - :type data: dict[str, Any] - :rtype: None - """ - async with sio.session(sid) as session: - room = session["room"] - state = clients[room] - - state.queue.update( - data["uuid"], - lambda item: item.update(**data["meta"]), - ) - - for entry in state.waiting_room: - if entry.uuid == data["uuid"] or str(entry.uuid) == data["uuid"]: - entry.update(**data["meta"]) - - await send_state(state, room) - - -@sio.on("get-first") -async def handle_get_first(sid: str) -> None: - """ - Handle the "get-first" message. - - This message is send by the playback client, once it has connected. It - should only be send for the initial song. Each subsequent song should be - requestet with a "pop-then-get-next" message (See - :py:func:`handle_pop_then_get_next`). - - If no songs are in the queue for this room, this function waits until one - is available, then notes its starting time and sends it back to the - playback client in a "play" message. This will be handled by the - :py:func:`syng.client.handle_play` function. - - :param sid: The session id of the requesting client - :type sid: str - :rtype: None - """ - async with sio.session(sid) as session: - room = session["room"] - state = clients[room] - - current = await state.queue.peek() - current.started_at = datetime.datetime.now().timestamp() - - await sio.emit("play", current, room=sid) - - -@sio.on("waiting-room-to-queue") -async def handle_waiting_room_to_queue(sid: str, data: dict[str, Any]) -> None: - """ - Handle the "waiting-room-to-queue" message. - - If on an admin-connection, removes a song from the waiting room and appends it to - the queue. - - :param sid: The session id of the requesting client - :type sid: str - :rtype: None - """ - async with sio.session(sid) as session: - room = session["room"] - is_admin = session["admin"] - state = clients[room] - - if is_admin: - entry = next( - (wr_entry for wr_entry in state.waiting_room if str(wr_entry.uuid) == data["uuid"]), - None, - ) - if entry is not None: - state.waiting_room.remove(entry) - await append_to_queue(room, entry, sid) - - -async def add_songs_from_waiting_room(room: str) -> None: - """ - Add all songs from the waiting room, that should be added to the queue. - - A song should be added if none of its performers are already queued. - - This should be called every time a song leaves the queue. - - :param room: The room holding the queue. - :type room: str - :rtype: None - """ - state = clients[room] - - wrs_to_remove = [] - for wr_entry in state.waiting_room: - if state.queue.find_by_name(wr_entry.performer) is None: - await append_to_queue(room, wr_entry) - wrs_to_remove.append(wr_entry) - - for wr_entry in wrs_to_remove: - state.waiting_room.remove(wr_entry) - - -async def discard_first(room: str) -> Entry: - """ - Gets the first element of the queue, handling resulting triggers. - - This function is used to get the first element of the queue, and handle - the resulting triggers. This includes adding songs from the waiting room, - and updating the state of the room. - - :param room: The room to get the first element from. - :type room: str - :rtype: Entry - """ - state = clients[room] - - old_entry = await state.queue.popleft() - - await add_songs_from_waiting_room(room) - - state.recent.append(old_entry) - state.last_seen = datetime.datetime.now() - - return old_entry - - -@sio.on("pop-then-get-next") -async def handle_pop_then_get_next(sid: str) -> None: - """ - Handle the "pop-then-get-next" message. - - This function acts similar to the :py:func:`handle_get_first` function. The - main difference is, that prior to sending a song to the playback client, - the first element of the queue is discarded. - - Afterwards it follows the same steps as the handler for the "play" message, - get the first element of the queue, annotate it with the current time, - update everyones state and send the entry it to the playback client in a - "play" message. This will be handled by the - :py:func:`syng.client.handle_play` function. - - :param sid: The session id of the requesting playback client - :type sid: str - :rtype: None - """ - async with sio.session(sid) as session: - room = session["room"] - state = clients[room] - - if sid != state.sid: - return - - await discard_first(room) - await send_state(state, room) - - current = await state.queue.peek() - current.started_at = datetime.datetime.now().timestamp() - await send_state(state, room) - - await sio.emit("play", current, room=sid) - - -def check_registration(key: str) -> bool: - """ - Check if a given key is in the registration keyfile. - - This is used to authenticate a client, if the server is in private or - restricted mode. - - :param key: The key to check - :type key: str - :return: True if the key is in the registration keyfile, False otherwise - :rtype: bool - """ - with open(app["registration-keyfile"], encoding="utf8") as f: - raw_keys = f.readlines() - keys = [key[:64] for key in raw_keys] - - return key in keys - - -@sio.on("register-client") -async def handle_register_client(sid: str, data: dict[str, Any]) -> None: - """ - Handle the "register-client" message. - - The data dictionary should have the following keys: - - `room` (Optional), the requested room - - `config`, an dictionary of initial configurations - - `queue`, a list of initial entries for the queue. The entries are - encoded as a dictionary. - - `recent`, a list of initial entries for the recent list. The entries - are encoded as a dictionary. - - `secret`, the secret of the room - - `key`, a registration key given out by the server administrator - - This will register a new playback client to a specific room. If there - already exists a playback client registered for this room, this - playback client will be replaced if and only if, the new playback - client has the same secret. - - If registration is restricted, abort, if the given key is not in the - registration keyfile. - - If no room is provided, a fresh room id is generated. - - If the client provides a new room, or a new room id was generated, the - server will create a new :py:class:`State` object and associate it with - the room id. The state will be initialized with a queue and recent - list, an initial config as well as no sources (yet). - - In any case, the client will be notified of the success or failure, along - with its assigned room key via a "client-registered" message. This will be - handled by the :py:func:`syng.client.handle_client_registered` function. - - If it was successfully registerd, the client will be added to its assigend - or requested room. - - Afterwards all clients in the room will be send the current state. - - :param sid: The session id of the requesting playback client. - :type sid: str - :param data: A dictionary with the keys described above - :type data: dict[str, Any] - :rtype: None - """ - - def gen_id(length: int = 4) -> str: - client_id = "".join([random.choice(string.ascii_letters) for _ in range(length)]) - if client_id in clients: - client_id = gen_id(length + 1) - return client_id - - if "key" in data["config"]: - data["config"]["key"] = hashlib.sha256(data["config"]["key"].encode()).hexdigest() - - if app["type"] == "private" and ( - "key" not in data["config"] or not check_registration(data["config"]["key"]) - ): - await sio.emit( - "client-registered", - {"success": False, "room": None}, - room=sid, - ) - return - - room: str = ( - data["config"]["room"] if "room" in data["config"] and data["config"]["room"] else gen_id() - ) - async with sio.session(sid) as session: - session["room"] = room - - if room in clients: - old_state: State = clients[room] - if data["config"]["secret"] == old_state.client.config["secret"]: - logger.info("Got new client connection for %s", room) - old_state.sid = sid - old_state.client = Client( - sources=old_state.client.sources, - sources_prio=old_state.client.sources_prio, - config=DEFAULT_CONFIG | data["config"], - ) - await sio.enter_room(sid, room) - await sio.emit("client-registered", {"success": True, "room": room}, room=sid) - await send_state(clients[room], sid) + async def append_to_queue( + self, room: str, entry: Entry, report_to: Optional[str] = None + ) -> None: + """ + Append a song to the queue for a given session. + + Checks, if the computed start time is before the configured end time of the + event, and reports an error, if the end time is exceeded. + + :param room: The room with the queue. + :type room: str + :param entry: The entry that contains the song. + :type entry: Entry + :param report_to: If an error occurs, who to report to. + :type report_to: Optional[str] + :rtype: None + """ + state = self.clients[room] + + first_song = state.queue.try_peek() + if first_song is None or first_song.started_at is None: + start_time = datetime.datetime.now().timestamp() else: - logger.warning("Got wrong secret for %s", room) - await sio.emit("client-registered", {"success": False, "room": room}, room=sid) - else: - logger.info("Registerd new client %s", room) - initial_entries = [Entry(**entry) for entry in data["queue"]] - initial_waiting_room = [Entry(**entry) for entry in data["waiting_room"]] - initial_recent = [Entry(**entry) for entry in data["recent"]] + start_time = first_song.started_at - clients[room] = State( - queue=Queue(initial_entries), - waiting_room=initial_waiting_room, - recent=initial_recent, - sid=sid, - client=Client( - sources={}, - sources_prio=[], - config=DEFAULT_CONFIG | data["config"], - ), + start_time = state.queue.fold( + lambda item, time: time + item.duration + state.client.config["preview_duration"] + 1, + start_time, ) - await sio.enter_room(sid, room) - await sio.emit("client-registered", {"success": True, "room": room}, room=sid) - await send_state(clients[room], sid) + if state.client.config["last_song"]: + if state.client.config["last_song"] < start_time: + if report_to is not None: + await self.sio.emit( + "err", + { + "type": "QUEUE_FULL", + "end_time": state.client.config["last_song"], + }, + room=report_to, + ) + return + state.queue.append(entry) + await self.send_state(state, room) -@sio.on("sources") -async def handle_sources(sid: str, data: dict[str, Any]) -> None: - """ - Handle the "sources" message. + await self.sio.emit( + "get-meta-info", + entry, + room=self.clients[room].sid, + ) - Get the list of sources the client wants to use. Update internal list of - sources, remove unused sources and query for a config for all uninitialized - sources by sending a "request-config" message for each such source to the - playback client. This will be handled by the - :py:func:`syng.client.request-config` function. + async def handle_show_config(self, sid: str) -> None: + """ + Sends public config to webclient. - This will not yet add the sources to the configuration, rather gather what - sources need to be configured and request their configuration. The list - of sources will set the :py:attr:`Config.sources_prio` attribute. + This will only be send if the client is on an admin connection. - :param sid: The session id of the playback client - :type sid: str - :param data: A dictionary containing a "sources" key, with the list of - sources to use. - :type data: dict[str, Any] - :rtype: None - """ - async with sio.session(sid) as session: - room = session["room"] - state = clients[room] + :param sid: The session id of the client sending this request + :type sid: str + :rtype: None + """ - if sid != state.sid: - return + async with self.sio.session(sid) as session: + room = session["room"] + is_admin = session["admin"] + state = self.clients[room] - unused_sources = state.client.sources.keys() - data["sources"] - new_sources = data["sources"] - state.client.sources.keys() - - for source in unused_sources: - del state.client.sources[source] - - state.client.sources_prio = data["sources"] - - for name in new_sources: - await sio.emit("request-config", {"source": name}, room=sid) - - -@sio.on("config-chunk") -async def handle_config_chunk(sid: str, data: dict[str, Any]) -> None: - """ - Handle the "config-chunk" message. - - This is called, when a source wants its configuration transmitted in - chunks, rather than a single message. If the source already exist - (e.g. when this is not the first chunk), the config will be added - to the source, otherwise a source will be created with the given - configuration. - - :param sid: The session id of the playback client - :type sid: str - :param data: A dictionary with a "source" (str) and a - "config" (dict[str, Any]) entry. The exact content of the config entry - depends on the source. - :rtype: None - """ - async with sio.session(sid) as session: - room = session["room"] - state = clients[room] - - if sid != state.sid: - return - - if data["source"] not in state.client.sources: - state.client.sources[data["source"]] = available_sources[data["source"]](data["config"]) - else: - state.client.sources[data["source"]].add_to_config(data["config"], data["number"]) - - -@sio.on("config") -async def handle_config(sid: str, data: dict[str, Any]) -> None: - """ - Handle the "config" message. - - This is called, when a source wants its configuration transmitted in - a single message, rather than chunks. A source will be created with the - given configuration. - - :param sid: The session id of the playback client - :type sid: str - :param data: A dictionary with a "source" (str) and a - "config" (dict[str, Any]) entry. The exact content of the config entry - depends on the source. - :type data: dict[str, Any] - :rtype: None - """ - async with sio.session(sid) as session: - room = session["room"] - state = clients[room] - - if sid != state.sid: - return - - state.client.sources[data["source"]] = available_sources[data["source"]](data["config"]) - - -@sio.on("register-web") -async def handle_register_web(sid: str, data: dict[str, Any]) -> bool: - """ - Handle a "register-web" message. - - Adds a web client to a requested room and sends it the initial state of the - queue and recent list. - - :param sid: The session id of the web client. - :type sid: str - :param data: A dictionary, containing at least a "room" entry. - :type data: dict[str, Any] - :returns: True, if the room exist, False otherwise - :rtype: bool - """ - if data["room"] in clients: - async with sio.session(sid) as session: - session["room"] = data["room"] - await sio.enter_room(sid, session["room"]) - state = clients[session["room"]] - await send_state(state, sid) - return True - return False - - -@sio.on("register-admin") -async def handle_register_admin(sid: str, data: dict[str, Any]) -> bool: - """ - Handle a "register-admin" message. - - If the client provides the correct secret for its room, the connection is - upgraded to an admin connection. - - :param sid: The session id of the client, requesting admin. - :type sid: str: - :param data: A dictionary with at least a "secret" entry. - :type data: dict[str, Any] - :returns: True, if the secret is correct, False otherwise - :rtype: bool - """ - async with sio.session(sid) as session: - room = session["room"] - state = clients[room] - - is_admin: bool = data["secret"] == state.client.config["secret"] - async with sio.session(sid) as session: - session["admin"] = is_admin - return is_admin - - -@sio.on("skip-current") -async def handle_skip_current(sid: str) -> None: - """ - Handle a "skip-current" message. - - If this comes from an admin connection, forward the "skip-current" message - to the playback client. This will be handled by the - :py:func:`syng.client.handle_skip_current` function. - - :param sid: The session id of the client, requesting. - :type sid: str - :rtype: None - """ - async with sio.session(sid) as session: - room = session["room"] - is_admin = session["admin"] - state = clients[room] - - if is_admin: - old_entry = await discard_first(room) - await sio.emit("skip-current", old_entry, room=clients[room].sid) - await send_state(state, room) - - -@sio.on("move-to") -async def handle_move_to(sid: str, data: dict[str, Any]) -> None: - """ - Handle the "move-to" message. - - If on an admin connection, moves the entry specified in the data to the - position specified in the data. - - :param sid: The session id of the client requesting. - :type sid: str - :param data: A dictionary with at least an "uuid" and a "target" entry - :type data: dict[str, Any] - :rtype: None - """ - - async with sio.session(sid) as session: - room = session["room"] - is_admin = session["admin"] - - state = clients[room] - - if is_admin: - await state.queue.move_to(data["uuid"], data["target"]) - await send_state(state, room) - - -@sio.on("move-up") -async def handle_move_up(sid: str, data: dict[str, Any]) -> None: - """ - Handle the "move-up" message. - - If on an admin connection, moves up the entry specified in the data by one - place in the queue. - - :param sid: The session id of the client requesting. - :type sid: str - :param data: A dictionary with at least an "uuid" entry - :type data: dict[str, Any] - :rtype: None - """ - async with sio.session(sid) as session: - room = session["room"] - is_admin = session["admin"] - state = clients[room] - if is_admin: - await state.queue.move_up(data["uuid"]) - await send_state(state, room) - - -@sio.on("skip") -async def handle_skip(sid: str, data: dict[str, Any]) -> None: - """ - Handle the "skip" message. - - If on an admin connection, removes the entry specified by data["uuid"] - from the queue or the waiting room. Triggers the waiting room. - - :param sid: The session id of the client requesting. - :type sid: str - :param data: A dictionary with at least an "uuid" entry. - :type data: dict[str, Any] - :rtype: None - """ - async with sio.session(sid) as session: - room = session["room"] - is_admin = session["admin"] - state = clients[room] - - if is_admin: - entry = state.queue.find_by_uuid(data["uuid"]) - if entry is not None: - logger.info("Skipping %s", entry) - - await add_songs_from_waiting_room(room) - - await state.queue.remove(entry) - - first_entry_index = None - for idx, wr_entry in enumerate(state.waiting_room): - if wr_entry.uuid == data["uuid"]: - first_entry_index = idx - break - - if first_entry_index is not None: - logger.info( - "Deleting %s from waiting room", - state.waiting_room[first_entry_index], + if is_admin: + await self.sio.emit( + "config", + state.client.config, + sid, ) - del state.waiting_room[first_entry_index] - await send_state(state, room) + else: + await self.sio.emit("err", {"type": "NO_ADMIN"}, sid) + async def handle_update_config(self, sid: str, data: dict[str, Any]) -> None: + """ + Forwards an updated config from an authorized webclient to the playback client. -@sio.on("disconnect") -async def handle_disconnect(sid: str) -> None: - """ - Handle the "disconnect" message. + This is currently untrested and should be used with caution. - This message is send automatically, when a client disconnets. + :param sid: The session id of the client sending this request + :type sid: str + :param data: A dictionary encoding the new configuration + :type data: dict[str, Any] + :rtype: None + """ + async with self.sio.session(sid) as session: + room = session["room"] + is_admin = session["admin"] + state = self.clients[room] - Remove the client from its room. + if is_admin: + try: + config = jsonencoder.loads(data["config"]) + await self.sio.emit( + "update_config", + DEFAULT_CONFIG | config, + state.sid, + ) + state.client.config = DEFAULT_CONFIG | config + await self.sio.emit("update_config", config, room) + except JSONDecodeError: + await self.sio.emit("err", {"type": "JSON_MALFORMED"}) - :param sid: The session id of the client disconnecting - :type sid: str - :rtype: None - """ - async with sio.session(sid) as session: - if "room" in session: - await sio.leave_room(sid, session["room"]) + else: + await self.sio.emit("err", {"type": "NO_ADMIN"}, sid) + async def handle_append(self, sid: str, data: dict[str, Any]) -> None: + """ + Handle the "append" message. -@sio.on("search") -async def handle_search(sid: str, data: dict[str, Any]) -> None: - """ - Handle the "search" message. + This should be called from a web client. Appends the entry, that is encoded + within the data to the room the client is currently connected to. An entry + constructed this way, will be given a UUID, to differentiate it from other + entries for the same song. Additionally an id of the web client is saved + for that entry. - Forwards the dict["query"] to the :py:func:`Source.search` method, and - execute them concurrently. The order is given by the - :py:attr:`Config.sources_prio` attribute of the state. + If the room is configured to no longer accept songs past a certain time + (via the :py:attr:`Config.last_song` attribute), it is checked, if the + start time of the song would exceed this time. If this is the case, the + request is denied and a "msg" message is send to the client, detailing + this. - The result will be send with a "search-results" message to the (web) - client. + If a waitingroom is forced or optional, it is checked, if one of the performers is + already in queue. In that case, a "ask_for_waitingroom" message is send to the + client. - :param sid: The session id of the client requesting. - :type sid: str - :param data: A dictionary with at least a "query" entry. - :type data: dict[str, str] - :rtype: None - """ - async with sio.session(sid) as session: - room = session["room"] - state = clients[room] + Otherwise the song is added to the queue. And all connected clients (web + and playback client) are informed of the new state with a "state" message. - query = data["query"] - if ( - app["type"] != "restricted" - or "key" in state.client.config - and check_registration(state.client.config["key"]) - ): - results_list = await asyncio.gather( - *[state.client.sources[source].search(query) for source in state.client.sources_prio] + Since some properties of a song can only be accessed on the playback + client, a "get-meta-info" message is send to the playback client. This is + handled there with the :py:func:`syng.client.handle_get_meta_info` + function. + + :param sid: The session id of the client sending this request + :type sid: str + :param data: A dictionary encoding the entry, that should be added to the + queue. + :type data: dict[str, Any] + :rtype: None + """ + async with self.sio.session(sid) as session: + room = session["room"] + state = self.clients[room] + + if len(data["performer"]) > 50: + await self.sio.emit("err", {"type": "NAME_LENGTH", "name": data["performer"]}, room=sid) + return + + if predict([data["performer"]]) == [1]: + await self.sio.emit("err", {"type": "PROFANITY", "name": data["performer"]}, room=sid) + return + + if state.client.config["waiting_room_policy"] and ( + state.client.config["waiting_room_policy"].lower() == "forced" + or state.client.config["waiting_room_policy"].lower() == "optional" + ): + old_entry = state.queue.find_by_name(data["performer"]) + if old_entry is not None: + await self.sio.emit( + "ask_for_waitingroom", + { + "current_entry": { + "source": data["source"], + "performer": data["performer"], + "ident": data["ident"], + }, + "old_entry": { + "artist": old_entry.artist, + "title": old_entry.title, + "performer": old_entry.performer, + }, + }, + room=sid, + ) + return + + source_obj = state.client.sources[data["source"]] + + entry = await source_obj.get_entry(data["performer"], data["ident"]) + + if entry is None: + await self.sio.emit( + "msg", + {"msg": f"Unable to append {data['ident']}. Maybe try again?"}, + room=sid, + ) + return + + entry.uid = data["uid"] if "uid" in data else None + + await self.append_to_queue(room, entry, sid) + + async def handle_append_anyway(self, sid: str, data: dict[str, Any]) -> None: + """ + Appends a song to the queue, even if the performer is already in queue. + + Works the same as handle_append, but without the check if the performer is already + in queue. + + Only if the waiting_room_policy is not configured as forced. + """ + async with self.sio.session(sid) as session: + room = session["room"] + state = self.clients[room] + + if len(data["performer"]) > 50: + await self.sio.emit("err", {"type": "NAME_LENGTH", "name": data["performer"]}, room=sid) + return + + if predict([data["performer"]]) == [1]: + await self.sio.emit("err", {"type": "PROFANITY", "name": data["performer"]}, room=sid) + return + + if state.client.config["waiting_room_policy"].lower() == "forced": + await self.sio.emit( + "err", + {"type": "WAITING_ROOM_FORCED"}, + room=sid, + ) + return + + source_obj = state.client.sources[data["source"]] + + entry = await source_obj.get_entry(data["performer"], data["ident"]) + + if entry is None: + await self.sio.emit( + "msg", + {"msg": f"Unable to append {data['ident']}. Maybe try again?"}, + room=sid, + ) + return + + entry.uid = data["uid"] if "uid" in data else None + + await self.append_to_queue(room, entry, sid) + + async def handle_meta_info(self, sid: str, data: dict[str, Any]) -> None: + """ + Handle the "meta-info" message. + + Updated a :py:class:syng.entry.Entry`, that is encoded in the data + parameter, in the queue, that belongs to the room the requesting client + belongs to, with new meta data, that is send from the playback client. + + Afterwards send the updated queue to all members of the room. + + :param sid: The session id of the client sending this request. + :type sid: str + :param data: A dictionary encoding the entry to update (already with the + new metadata) + :type data: dict[str, Any] + :rtype: None + """ + async with self.sio.session(sid) as session: + room = session["room"] + state = self.clients[room] + + state.queue.update( + data["uuid"], + lambda item: item.update(**data["meta"]), ) - results = [ - search_result for source_result in results_list for search_result in source_result + for entry in state.waiting_room: + if entry.uuid == data["uuid"] or str(entry.uuid) == data["uuid"]: + entry.update(**data["meta"]) + + await self.send_state(state, room) + + async def handle_get_first(self, sid: str) -> None: + """ + Handle the "get-first" message. + + This message is send by the playback client, once it has connected. It + should only be send for the initial song. Each subsequent song should be + requestet with a "pop-then-get-next" message (See + :py:func:`handle_pop_then_get_next`). + + If no songs are in the queue for this room, this function waits until one + is available, then notes its starting time and sends it back to the + playback client in a "play" message. This will be handled by the + :py:func:`syng.client.handle_play` function. + + :param sid: The session id of the requesting client + :type sid: str + :rtype: None + """ + async with self.sio.session(sid) as session: + room = session["room"] + state = self.clients[room] + + current = await state.queue.peek() + current.started_at = datetime.datetime.now().timestamp() + + await self.sio.emit("play", current, room=sid) + + async def handle_waiting_room_to_queue(self, sid: str, data: dict[str, Any]) -> None: + """ + Handle the "waiting-room-to-queue" message. + + If on an admin-connection, removes a song from the waiting room and appends it to + the queue. + + :param sid: The session id of the requesting client + :type sid: str + :rtype: None + """ + async with self.sio.session(sid) as session: + room = session["room"] + is_admin = session["admin"] + state = self.clients[room] + + if is_admin: + entry = next( + (wr_entry for wr_entry in state.waiting_room if str(wr_entry.uuid) == data["uuid"]), + None, + ) + if entry is not None: + state.waiting_room.remove(entry) + await self.append_to_queue(room, entry, sid) + + async def add_songs_from_waiting_room(self, room: str) -> None: + """ + Add all songs from the waiting room, that should be added to the queue. + + A song should be added if none of its performers are already queued. + + This should be called every time a song leaves the queue. + + :param room: The room holding the queue. + :type room: str + :rtype: None + """ + state = self.clients[room] + + wrs_to_remove = [] + for wr_entry in state.waiting_room: + if state.queue.find_by_name(wr_entry.performer) is None: + await self.append_to_queue(room, wr_entry) + wrs_to_remove.append(wr_entry) + + for wr_entry in wrs_to_remove: + state.waiting_room.remove(wr_entry) + + async def discard_first(self, room: str) -> Entry: + """ + Gets the first element of the queue, handling resulting triggers. + + This function is used to get the first element of the queue, and handle + the resulting triggers. This includes adding songs from the waiting room, + and updating the state of the room. + + :param room: The room to get the first element from. + :type room: str + :rtype: Entry + """ + state = self.clients[room] + + old_entry = await state.queue.popleft() + + await self.add_songs_from_waiting_room(room) + + state.recent.append(old_entry) + state.last_seen = datetime.datetime.now() + + return old_entry + + async def handle_pop_then_get_next(self, sid: str) -> None: + """ + Handle the "pop-then-get-next" message. + + This function acts similar to the :py:func:`handle_get_first` function. The + main difference is, that prior to sending a song to the playback client, + the first element of the queue is discarded. + + Afterwards it follows the same steps as the handler for the "play" message, + get the first element of the queue, annotate it with the current time, + update everyones state and send the entry it to the playback client in a + "play" message. This will be handled by the + :py:func:`syng.client.handle_play` function. + + :param sid: The session id of the requesting playback client + :type sid: str + :rtype: None + """ + async with self.sio.session(sid) as session: + room = session["room"] + state = self.clients[room] + + if sid != state.sid: + return + + await self.discard_first(room) + await self.send_state(state, room) + + current = await state.queue.peek() + current.started_at = datetime.datetime.now().timestamp() + await self.send_state(state, room) + + await self.sio.emit("play", current, room=sid) + + def check_registration(self, key: str) -> bool: + """ + Check if a given key is in the registration keyfile. + + This is used to authenticate a client, if the server is in private or + restricted mode. + + :param key: The key to check + :type key: str + :return: True if the key is in the registration keyfile, False otherwise + :rtype: bool + """ + with open(self.app["registration-keyfile"], encoding="utf8") as f: + raw_keys = f.readlines() + keys = [key[:64] for key in raw_keys] + + return key in keys + + async def handle_register_client(self, sid: str, data: dict[str, Any]) -> None: + """ + Handle the "register-client" message. + + The data dictionary should have the following keys: + - `room` (Optional), the requested room + - `config`, an dictionary of initial configurations + - `queue`, a list of initial entries for the queue. The entries are + encoded as a dictionary. + - `recent`, a list of initial entries for the recent list. The entries + are encoded as a dictionary. + - `secret`, the secret of the room + - `key`, a registration key given out by the server administrator + + This will register a new playback client to a specific room. If there + already exists a playback client registered for this room, this + playback client will be replaced if and only if, the new playback + client has the same secret. + + If registration is restricted, abort, if the given key is not in the + registration keyfile. + + If no room is provided, a fresh room id is generated. + + If the client provides a new room, or a new room id was generated, the + server will create a new :py:class:`State` object and associate it with + the room id. The state will be initialized with a queue and recent + list, an initial config as well as no sources (yet). + + In any case, the client will be notified of the success or failure, along + with its assigned room key via a "client-registered" message. This will be + handled by the :py:func:`syng.client.handle_client_registered` function. + + If it was successfully registerd, the client will be added to its assigend + or requested room. + + Afterwards all clients in the room will be send the current state. + + :param sid: The session id of the requesting playback client. + :type sid: str + :param data: A dictionary with the keys described above + :type data: dict[str, Any] + :rtype: None + """ + + def gen_id(length: int = 4) -> str: + client_id = "".join([random.choice(string.ascii_letters) for _ in range(length)]) + if client_id in self.clients: + client_id = gen_id(length + 1) + return client_id + + if "key" in data["config"]: + data["config"]["key"] = hashlib.sha256(data["config"]["key"].encode()).hexdigest() + + if self.app["type"] == "private" and ( + "key" not in data["config"] or not self.check_registration(data["config"]["key"]) + ): + await self.sio.emit( + "client-registered", + {"success": False, "room": None}, + room=sid, + ) + return + + room: str = ( + data["config"]["room"] + if "room" in data["config"] and data["config"]["room"] + else gen_id() + ) + async with self.sio.session(sid) as session: + session["room"] = room + + if room in self.clients: + old_state: State = self.clients[room] + if data["config"]["secret"] == old_state.client.config["secret"]: + logger.info("Got new client connection for %s", room) + old_state.sid = sid + old_state.client = Client( + sources=old_state.client.sources, + sources_prio=old_state.client.sources_prio, + config=DEFAULT_CONFIG | data["config"], + ) + await self.sio.enter_room(sid, room) + await self.sio.emit("client-registered", {"success": True, "room": room}, room=sid) + await self.send_state(self.clients[room], sid) + else: + logger.warning("Got wrong secret for %s", room) + await self.sio.emit("client-registered", {"success": False, "room": room}, room=sid) + else: + logger.info("Registerd new client %s", room) + initial_entries = [Entry(**entry) for entry in data["queue"]] + initial_waiting_room = [Entry(**entry) for entry in data["waiting_room"]] + initial_recent = [Entry(**entry) for entry in data["recent"]] + + self.clients[room] = State( + queue=Queue(initial_entries), + waiting_room=initial_waiting_room, + recent=initial_recent, + sid=sid, + client=Client( + sources={}, + sources_prio=[], + config=DEFAULT_CONFIG | data["config"], + ), + ) + + await self.sio.enter_room(sid, room) + await self.sio.emit("client-registered", {"success": True, "room": room}, room=sid) + await self.send_state(self.clients[room], sid) + + async def handle_sources(self, sid: str, data: dict[str, Any]) -> None: + """ + Handle the "sources" message. + + Get the list of sources the client wants to use. Update internal list of + sources, remove unused sources and query for a config for all uninitialized + sources by sending a "request-config" message for each such source to the + playback client. This will be handled by the + :py:func:`syng.client.request-config` function. + + This will not yet add the sources to the configuration, rather gather what + sources need to be configured and request their configuration. The list + of sources will set the :py:attr:`Config.sources_prio` attribute. + + :param sid: The session id of the playback client + :type sid: str + :param data: A dictionary containing a "sources" key, with the list of + sources to use. + :type data: dict[str, Any] + :rtype: None + """ + async with self.sio.session(sid) as session: + room = session["room"] + state = self.clients[room] + + if sid != state.sid: + return + + unused_sources = state.client.sources.keys() - data["sources"] + new_sources = data["sources"] - state.client.sources.keys() + + for source in unused_sources: + del state.client.sources[source] + + state.client.sources_prio = data["sources"] + + for name in new_sources: + await self.sio.emit("request-config", {"source": name}, room=sid) + + async def handle_config_chunk(self, sid: str, data: dict[str, Any]) -> None: + """ + Handle the "config-chunk" message. + + This is called, when a source wants its configuration transmitted in + chunks, rather than a single message. If the source already exist + (e.g. when this is not the first chunk), the config will be added + to the source, otherwise a source will be created with the given + configuration. + + :param sid: The session id of the playback client + :type sid: str + :param data: A dictionary with a "source" (str) and a + "config" (dict[str, Any]) entry. The exact content of the config entry + depends on the source. + :rtype: None + """ + async with self.sio.session(sid) as session: + room = session["room"] + state = self.clients[room] + + if sid != state.sid: + return + + if data["source"] not in state.client.sources: + state.client.sources[data["source"]] = available_sources[data["source"]](data["config"]) + else: + state.client.sources[data["source"]].add_to_config(data["config"], data["number"]) + + async def handle_config(self, sid: str, data: dict[str, Any]) -> None: + """ + Handle the "config" message. + + This is called, when a source wants its configuration transmitted in + a single message, rather than chunks. A source will be created with the + given configuration. + + :param sid: The session id of the playback client + :type sid: str + :param data: A dictionary with a "source" (str) and a + "config" (dict[str, Any]) entry. The exact content of the config entry + depends on the source. + :type data: dict[str, Any] + :rtype: None + """ + async with self.sio.session(sid) as session: + room = session["room"] + state = self.clients[room] + + if sid != state.sid: + return + + state.client.sources[data["source"]] = available_sources[data["source"]](data["config"]) + + async def handle_register_web(self, sid: str, data: dict[str, Any]) -> bool: + """ + Handle a "register-web" message. + + Adds a web client to a requested room and sends it the initial state of the + queue and recent list. + + :param sid: The session id of the web client. + :type sid: str + :param data: A dictionary, containing at least a "room" entry. + :type data: dict[str, Any] + :returns: True, if the room exist, False otherwise + :rtype: bool + """ + if data["room"] in self.clients: + async with self.sio.session(sid) as session: + session["room"] = data["room"] + await self.sio.enter_room(sid, session["room"]) + state = self.clients[session["room"]] + await self.send_state(state, sid) + return True + return False + + async def handle_register_admin(self, sid: str, data: dict[str, Any]) -> bool: + """ + Handle a "register-admin" message. + + If the client provides the correct secret for its room, the connection is + upgraded to an admin connection. + + :param sid: The session id of the client, requesting admin. + :type sid: str: + :param data: A dictionary with at least a "secret" entry. + :type data: dict[str, Any] + :returns: True, if the secret is correct, False otherwise + :rtype: bool + """ + async with self.sio.session(sid) as session: + room = session["room"] + state = self.clients[room] + + is_admin: bool = data["secret"] == state.client.config["secret"] + async with self.sio.session(sid) as session: + session["admin"] = is_admin + return is_admin + + async def handle_skip_current(self, sid: str) -> None: + """ + Handle a "skip-current" message. + + If this comes from an admin connection, forward the "skip-current" message + to the playback client. This will be handled by the + :py:func:`syng.client.handle_skip_current` function. + + :param sid: The session id of the client, requesting. + :type sid: str + :rtype: None + """ + async with self.sio.session(sid) as session: + room = session["room"] + is_admin = session["admin"] + state = self.clients[room] + + if is_admin: + old_entry = await self.discard_first(room) + await self.sio.emit("skip-current", old_entry, room=self.clients[room].sid) + await self.send_state(state, room) + + async def handle_move_to(self, sid: str, data: dict[str, Any]) -> None: + """ + Handle the "move-to" message. + + If on an admin connection, moves the entry specified in the data to the + position specified in the data. + + :param sid: The session id of the client requesting. + :type sid: str + :param data: A dictionary with at least an "uuid" and a "target" entry + :type data: dict[str, Any] + :rtype: None + """ + + async with self.sio.session(sid) as session: + room = session["room"] + is_admin = session["admin"] + + state = self.clients[room] + + if is_admin: + await state.queue.move_to(data["uuid"], data["target"]) + await self.send_state(state, room) + + async def handle_move_up(self, sid: str, data: dict[str, Any]) -> None: + """ + Handle the "move-up" message. + + If on an admin connection, moves up the entry specified in the data by one + place in the queue. + + :param sid: The session id of the client requesting. + :type sid: str + :param data: A dictionary with at least an "uuid" entry + :type data: dict[str, Any] + :rtype: None + """ + async with self.sio.session(sid) as session: + room = session["room"] + is_admin = session["admin"] + state = self.clients[room] + if is_admin: + await state.queue.move_up(data["uuid"]) + await self.send_state(state, room) + + async def handle_skip(self, sid: str, data: dict[str, Any]) -> None: + """ + Handle the "skip" message. + + If on an admin connection, removes the entry specified by data["uuid"] + from the queue or the waiting room. Triggers the waiting room. + + :param sid: The session id of the client requesting. + :type sid: str + :param data: A dictionary with at least an "uuid" entry. + :type data: dict[str, Any] + :rtype: None + """ + async with self.sio.session(sid) as session: + room = session["room"] + is_admin = session["admin"] + state = self.clients[room] + + if is_admin: + entry = state.queue.find_by_uuid(data["uuid"]) + if entry is not None: + logger.info("Skipping %s", entry) + + await self.add_songs_from_waiting_room(room) + + await state.queue.remove(entry) + + first_entry_index = None + for idx, wr_entry in enumerate(state.waiting_room): + if wr_entry.uuid == data["uuid"]: + first_entry_index = idx + break + + if first_entry_index is not None: + logger.info( + "Deleting %s from waiting room", + state.waiting_room[first_entry_index], + ) + del state.waiting_room[first_entry_index] + await self.send_state(state, room) + + async def handle_disconnect(self, sid: str) -> None: + """ + Handle the "disconnect" message. + + This message is send automatically, when a client disconnets. + + Remove the client from its room. + + :param sid: The session id of the client disconnecting + :type sid: str + :rtype: None + """ + async with self.sio.session(sid) as session: + if "room" in session: + await self.sio.leave_room(sid, session["room"]) + + async def handle_search(self, sid: str, data: dict[str, Any]) -> None: + """ + Handle the "search" message. + + Forwards the dict["query"] to the :py:func:`Source.search` method, and + execute them concurrently. The order is given by the + :py:attr:`Config.sources_prio` attribute of the state. + + The result will be send with a "search-results" message to the (web) + client. + + :param sid: The session id of the client requesting. + :type sid: str + :param data: A dictionary with at least a "query" entry. + :type data: dict[str, str] + :rtype: None + """ + async with self.sio.session(sid) as session: + room = session["room"] + state = self.clients[room] + + query = data["query"] + if ( + self.app["type"] != "restricted" + or "key" in state.client.config + and self.check_registration(state.client.config["key"]) + ): + results_list = await asyncio.gather( + *[ + state.client.sources[source].search(query) + for source in state.client.sources_prio + ] + ) + + results = [ + search_result for source_result in results_list for search_result in source_result + ] + await self.send_search_results(sid, results) + else: + print("Denied") + await self.sio.emit("search", {"query": query, "sid": sid}, room=state.sid) + + async def handle_search_results(self, sid: str, data: dict[str, Any]) -> None: + """ + Handle the "search-results" message. + + This message is send by the playback client, once it has received search + results. The results are send to the web client. + + The data dictionary should have the following keys: + - `sid`, the session id of the web client (str) + - `results`, a list of search results (list[dict[str, Any]]) + + :param sid: The session id of the playback client + :type sid: str + :param data: A dictionary with the keys described above + :type data: dict[str, Any] + :rtype: None + """ + async with self.sio.session(sid) as session: + room = session["room"] + state = self.clients[room] + + if sid != state.sid: + return + + web_sid = data["sid"] + results = [Result.from_dict(result) for result in data["results"]] + + # TODO: we convert the results to YouTube objects. This + # adds them to the cache to prevent YouTube from blocking us. + __unused_yt_list = [ + YouTube.from_result( + { + "duration": result.duration, + "title": result.title, + "channel": result.artist, + "url": result.ident, + } + ) + for result in results + if "youtube" == result.source ] - await send_search_results(sid, results) - else: - print("Denied") - await sio.emit("search", {"query": query, "sid": sid}, room=state.sid) + await self.send_search_results(web_sid, results) -@sio.on("search-results") -async def handle_search_results(sid: str, data: dict[str, Any]) -> None: - """ - Handle the "search-results" message. + async def send_search_results(self, sid: str, results: list[Result]) -> None: + """ + Send search results to a client. - This message is send by the playback client, once it has received search - results. The results are send to the web client. - - The data dictionary should have the following keys: - - `sid`, the session id of the web client (str) - - `results`, a list of search results (list[dict[str, Any]]) - - :param sid: The session id of the playback client - :type sid: str - :param data: A dictionary with the keys described above - :type data: dict[str, Any] - :rtype: None - """ - async with sio.session(sid) as session: - room = session["room"] - state = clients[room] - - if sid != state.sid: - return - - web_sid = data["sid"] - results = [Result.from_dict(result) for result in data["results"]] - - # TODO: we convert the results to YouTube objects. This - # adds them to the cache to prevent YouTube from blocking us. - __unused_yt_list = [ - YouTube.from_result( - { - "duration": result.duration, - "title": result.title, - "channel": result.artist, - "url": result.ident, - } + :param sid: The session id of the client to send the results to. + :type sid: str + :param results: The search results to send. + :type results: list[Result] + :rtype: None + """ + await self.sio.emit( + "search-results", + {"results": results}, + room=sid, ) - for result in results - if "youtube" == result.source - ] - await send_search_results(web_sid, results) + async def cleanup(self) -> None: + """ + Clean up the unused playback clients + This runs every hour, and removes every client, that did not requested a song for four hours. -async def send_search_results(sid: str, results: list[Result]) -> None: - """ - Send search results to a client. + :rtype: None + """ - :param sid: The session id of the client to send the results to. - :type sid: str - :param results: The search results to send. - :type results: list[Result] - :rtype: None - """ - await sio.emit( - "search-results", - {"results": results}, - room=sid, - ) + logger.info("Start Cleanup") + to_remove: list[str] = [] + for sid, state in self.clients.items(): + logger.info("Client %s, last seen: %s", sid, str(state.last_seen)) + if state.last_seen + datetime.timedelta(hours=4) < datetime.datetime.now(): + logger.info("No activity for 4 hours, removing %s", sid) + to_remove.append(sid) + for sid in to_remove: + await self.sio.disconnect(sid) + del self.clients[sid] + logger.info("End Cleanup") + # The internal loop counter does not use a regular timestamp, so we need to convert between + # regular datetime and the async loop time + now = datetime.datetime.now() + # today = datetime.datetime(now.year, now.month, now.day) + # next_run = today + datetime.timedelta(days=1) -async def cleanup() -> None: - """ - Clean up the unused playback clients + next_run = now + datetime.timedelta(hours=1) + offset = next_run.timestamp() - now.timestamp() + loop_next = asyncio.get_event_loop().time() + offset - This runs every hour, and removes every client, that did not requested a song for four hours. + logger.info("Next Cleanup at %s", str(next)) + asyncio.get_event_loop().call_at(loop_next, lambda: asyncio.create_task(self.cleanup())) - :rtype: None - """ + async def background_tasks( + self, + iapp: web.Application, + ) -> AsyncGenerator[None, None]: + """ + Create all the background tasks. - logger.info("Start Cleanup") - to_remove: list[str] = [] - for sid, state in clients.items(): - logger.info("Client %s, last seen: %s", sid, str(state.last_seen)) - if state.last_seen + datetime.timedelta(hours=4) < datetime.datetime.now(): - logger.info("No activity for 4 hours, removing %s", sid) - to_remove.append(sid) - for sid in to_remove: - await sio.disconnect(sid) - del clients[sid] - logger.info("End Cleanup") + For now, this is only the cleanup task. - # The internal loop counter does not use a regular timestamp, so we need to convert between - # regular datetime and the async loop time - now = datetime.datetime.now() - # today = datetime.datetime(now.year, now.month, now.day) - # next_run = today + datetime.timedelta(days=1) + :param iapp: The web application + :type iapp: web.Application + :rtype: AsyncGenerator[None, None] + """ - next_run = now + datetime.timedelta(hours=1) - offset = next_run.timestamp() - now.timestamp() - loop_next = asyncio.get_event_loop().time() + offset + iapp["repeated_cleanup"] = asyncio.create_task(self.cleanup()) - logger.info("Next Cleanup at %s", str(next)) - asyncio.get_event_loop().call_at(loop_next, lambda: asyncio.create_task(cleanup())) + yield + iapp["repeated_cleanup"].cancel() + await iapp["repeated_cleanup"] -async def background_tasks( - iapp: web.Application, -) -> AsyncGenerator[None, None]: - """ - Create all the background tasks. + def run(self, args: Namespace) -> None: + """ + Run the server. - For now, this is only the cleanup task. + `args` consists of the following attributes: + - `host`, the host to bind to + - `port`, the port to bind to + - `root_folder`, the root folder of the web client + - `registration_keyfile`, the file containing the registration keys + - `private`, if the server is private + - `restricted`, if the server is restricted - :param iapp: The web application - :type iapp: web.Application - :rtype: AsyncGenerator[None, None] - """ + :param args: The command line arguments + :type args: Namespace + :rtype: None + """ + self.app["type"] = ( + "private" if args.private else "restricted" if args.restricted else "public" + ) + if args.registration_keyfile: + self.app["registration-keyfile"] = args.registration_keyfile - iapp["repeated_cleanup"] = asyncio.create_task(cleanup()) + self.app["root_folder"] = args.root_folder - yield + self.app.add_routes( + [web.static("/assets/", os.path.join(self.app["root_folder"], "assets/"))] + ) - iapp["repeated_cleanup"].cancel() - await iapp["repeated_cleanup"] + self.app.router.add_route("*", "/", self.root_handler) + self.app.router.add_route("*", "/{room}", self.root_handler) + self.app.router.add_route("*", "/{room}/", self.root_handler) + + self.app.cleanup_ctx.append(self.background_tasks) + + web.run_app(self.app, host=args.host, port=args.port) def run_server(args: Namespace) -> None: """ Run the server. - `args` consists of the following attributes: - - `host`, the host to bind to - - `port`, the port to bind to - - `root_folder`, the root folder of the web client - - `registration_keyfile`, the file containing the registration keys - - `private`, if the server is private - - `restricted`, if the server is restricted - :param args: The command line arguments :type args: Namespace :rtype: None """ - app["type"] = "private" if args.private else "restricted" if args.restricted else "public" - if args.registration_keyfile: - app["registration-keyfile"] = args.registration_keyfile - - app["root_folder"] = args.root_folder - - app.add_routes([web.static("/assets/", os.path.join(app["root_folder"], "assets/"))]) - - app.router.add_route("*", "/", root_handler) - app.router.add_route("*", "/{room}", root_handler) - app.router.add_route("*", "/{room}/", root_handler) - - app.cleanup_ctx.append(background_tasks) - - web.run_app(app, host=args.host, port=args.port) + server = Server() + server.run(args) diff --git a/typings/socketio/__init__.pyi b/typings/socketio/__init__.pyi index 37bece2..59190b9 100644 --- a/typings/socketio/__init__.pyi +++ b/typings/socketio/__init__.pyi @@ -26,7 +26,9 @@ class AsyncServer: room: Optional[str] = None, ) -> None: ... def session(self, sid: str) -> _session_context_manager: ... - def on(self, event: str) -> Callable[[Handler | DictHandler], Handler | DictHandler]: ... + def on( + self, event: str, handler: Optional[Handler | DictHandler] = None + ) -> Callable[[Handler | DictHandler], Handler | DictHandler]: ... async def enter_room(self, sid: str, room: str) -> None: ... async def leave_room(self, sid: str, room: str) -> None: ... def attach(self, app: Any) -> None: ...