syng/syng/server.py

802 lines
25 KiB
Python

"""
Module for the Server.
Starts a async socketio server, and serves the web client::
usage: server.py [-h] [--host HOST] [--port PORT]
options:
-h, --help show this help message and exit
--host HOST, -H HOST
--port PORT, -p PORT
"""
from __future__ import annotations
import asyncio
import datetime
import logging
import random
import string
from argparse import ArgumentParser
from dataclasses import dataclass
from dataclasses import field
from typing import Any
from typing import AsyncGenerator
from typing import Optional
import socketio
from aiohttp import web
from . import json
from .entry import Entry
from .queue import Queue
from .sources import available_sources
from .sources import Source
sio = socketio.AsyncServer(
cors_allowed_origins="*", logger=True, engineio_logger=False, json=json
)
app = web.Application()
sio.attach(app)
async def root_handler(request: Any) -> Any:
"""
Handle the index and favicon requests.
If the path of the request ends with "/favicon.ico" return the favicon,
otherwise the index.html. This way the javascript can read the room code
from the url.
:param request Any: Webrequest from aiohttp
:return: Either the favicon or the index.html
:rtype web.FileResponse:
"""
if request.path.endswith("/favicon.ico"):
return web.FileResponse("syng/static/favicon.ico")
return web.FileResponse("syng/static/index.html")
logging.basicConfig(level=logging.WARNING)
logger = logging.getLogger(__name__)
@dataclass
class Config:
"""This stores the configuration of a specific playback client.
In case a new playback client connects to a room, these values can be
overwritten.
:param sources: A dictionary mapping the name of the used sources to their
instances.
:type sources: Source
:param sources_prio: A list defining the order of the search results.
:type sources_prio: list[str]
:param preview_duration: The duration in seconds the playback clients shows
a preview for the next song. This is accounted for in the calculation
of the ETA for songs later in the queue.
:type preview_duration: int
:param last_song: A timestamp, defining the end of the queue.
:type last_song: Optional[float]
"""
sources: dict[str, Source]
sources_prio: list[str]
preview_duration: int
last_song: Optional[float]
@dataclass
class State:
"""This defines the state of one session/room.
:param secret: The secret for the room. Used to log in as an admin on the
webclient or reconnect a playbackclient
:type secret: str
:param queue: A queue of :py:class:`syng.entry.Entry` objects. New songs
are appended to this, and if a playback client requests a song, it is
taken from the top.
:type queue: Queue
:param recent: A list of already played songs in order.
:type recent: list[Entry]
:param sid: The socket.io session id of the (unique) playback client. Once
a new playback client connects to a room (with the correct secret),
this will be swapped with the new sid.
:type sid: str
:param config: The config for the client
:type config: Config
"""
secret: str
queue: Queue
recent: list[Entry]
sid: str
config: Config
last_seen: datetime.datetime = field(
init=False, default_factory=datetime.datetime.now
)
async def send_state(state: State, sid: str) -> None:
"""
Send the current state (queue and recent-list) to sid.
This sends a "state" message. This can be received either by the playback
client, a web client or the whole room.
If it is send to a playback client, it will be handled by the
:py:func:`syng.client.handle_state` function.
:param state: The state to send
:type state: State
:param sid: The recepient of the "state" message
:type sid: str:
:rtype: None
"""
await sio.emit(
"state",
{"queue": state.queue, "recent": state.recent},
room=sid,
)
@sio.on("get-state")
async def handle_state(sid: str) -> None:
"""
Handle the "get-state" message.
Sends the current state to whoever requests it. This failes if the sender
is not part of any room.
:param sid: The initial sender, and therefore recepient of the "state"
message
:type sid: str
:rtype: None
"""
async with sio.session(sid) as session:
room = session["room"]
state = app["clients"][room]
await send_state(state, sid)
@sio.on("append")
async def handle_append(sid: str, data: dict[str, Any]) -> None:
"""
Handle the "append" message.
This should be called from a web client. Appends the entry, that is encoded
within the data to the room the client is currently connected to. An entry
constructed this way, will be given a UUID, to differentiate it from other
entries for the same song.
If the room is configured to no longer accept songs past a certain time
(via the :py:attr:`Config.last_song` attribute), it is checked, if the
start time of the song would exceed this time. If this is the case, the
request is denied and a "msg" message is send to the client, detailing
this.
Otherwise the song is added to the queue. And all connected clients (web
and playback client) are informed of the new state with a "state" message.
Since some properties of a song can only be accessed on the playback
client, a "get-meta-info" message is send to the playback client. This is
handled there with the :py:func:`syng.client.handle_get_meta_info`
function.
:param sid: The session id of the client sending this request
:type sid: str
:param data: A dictionary encoding the entry, that should be added to the
queue.
:type data: dict[str, Any]
:rtype: None
"""
async with sio.session(sid) as session:
room = session["room"]
state = app["clients"][room]
source_obj = state.config.sources[data["source"]]
entry = await source_obj.get_entry(data["performer"], data["ident"])
if entry is None:
await sio.emit("mst", {"msg": f"Unable to append {data['ident']}"})
return
first_song = state.queue.try_peek()
if first_song is None or first_song.started_at is None:
start_time = datetime.datetime.now().timestamp()
else:
start_time = first_song.started_at
start_time = state.queue.fold(
lambda item, time: time
+ item.duration
+ state.config.preview_duration
+ 1,
start_time,
)
if state.config.last_song:
if state.config.last_song < start_time:
end_time = datetime.datetime.fromtimestamp(state.config.last_song)
await sio.emit(
"msg",
{
"msg": f"The song queue ends at {end_time.hour:02d}:"
f"{end_time.minute:02d}."
},
room=sid,
)
return
state.queue.append(entry)
await send_state(state, room)
await sio.emit(
"get-meta-info",
entry,
room=app["clients"][room].sid,
)
@sio.on("meta-info")
async def handle_meta_info(sid: str, data: dict[str, Any]) -> None:
"""
Handle the "meta-info" message.
Updated a :py:class:syng.entry.Entry`, that is encoded in the data
parameter, in the queue, that belongs to the room the requesting client
belongs to, with new meta data, that is send from the playback client.
Afterwards send the updated queue to all members of the room.
:param sid: The session id of the client sending this request.
:type sid: str
:param data: A dictionary encoding the entry to update (already with the
new metadata)
:type data: dict[str, Any]
:rtype: None
"""
async with sio.session(sid) as session:
room = session["room"]
state = app["clients"][room]
state.queue.update(
data["uuid"],
lambda item: item.update(**data["meta"]),
)
await send_state(state, room)
@sio.on("get-first")
async def handle_get_first(sid: str) -> None:
"""
Handle the "get-first" message.
This message is send by the playback client, once it has connected. It
should only be send for the initial song. Each subsequent song should be
requestet with a "pop-then-get-next" message (See
:py:func:`handle_pop_then_get_next`).
If no songs are in the queue for this room, this function waits until one
is available, then notes its starting time and sends it back to the
playback client in a "play" message. This will be handled by the
:py:func:`syng.client.handle_play` function.
:param sid: The session id of the requesting client
:type sid: str
:rtype: None
"""
async with sio.session(sid) as session:
room = session["room"]
state = app["clients"][room]
current = await state.queue.peek()
current.started_at = datetime.datetime.now().timestamp()
await sio.emit("play", current, room=sid)
@sio.on("pop-then-get-next")
async def handle_pop_then_get_next(sid: str) -> None:
"""
Handle the "pop-then-get-next" message.
This function acts similar to the :py:func:`handle_get_first` function. The
main difference is, that prior to sending a song to the playback client,
the first element of the queue is discarded.
Afterwards it follows the same steps as the handler for the "play" message,
get the first element of the queue, annotate it with the current time,
update everyones state and send the entry it to the playback client in a
"play" message. This will be handled by the
:py:func:`syng.client.handle_play` function.
:param sid: The session id of the requesting playback client
:type sid: str
:rtype: None
"""
async with sio.session(sid) as session:
room = session["room"]
state = app["clients"][room]
if sid != state.sid:
return
old_entry = await state.queue.popleft()
state.recent.append(old_entry)
state.last_seen = datetime.datetime.now()
await send_state(state, room)
current = await state.queue.peek()
current.started_at = datetime.datetime.now().timestamp()
await send_state(state, room)
await sio.emit("play", current, room=sid)
@sio.on("register-client")
async def handle_register_client(sid: str, data: dict[str, Any]) -> None:
"""
Handle the "register-client" message.
The data dictionary should have the following keys:
- `room` (Optional), the requested room
- `config`, an dictionary of initial configurations
- `queue`, a list of initial entries for the queue. The entries are
encoded as a dictionary.
- `recent`, a list of initial entries for the recent list. The entries
are encoded as a dictionary.
- `secret`, the secret of the room
This will register a new playback client to a specific room. If there
already exists a playback client registered for this room, this
playback client will be replaced if and only if, the new playback
client has the same secret.
If no room is provided, a fresh room id is generated.
If the client provides a new room, or a new room id was generated, the
server will create a new :py:class:`State` object and associate it with
the room id. The state will be initialized with a queue and recent
list, an initial config as well as no sources (yet).
In any case, the client will be notified of the success or failure, along
with its assigned room key via a "client-registered" message. This will be
handled by the :py:func:`syng.client.handle_client_registered` function.
If it was successfully registerd, the client will be added to its assigend
or requested room.
Afterwards all clients in the room will be send the current state.
:param sid: The session id of the requesting playback client.
:type sid: str
:param data: A dictionary with the keys described above
:type data: dict[str, Any]
:rtype: None
"""
def gen_id(length: int = 4) -> str:
client_id = "".join(
[random.choice(string.ascii_letters) for _ in range(length)]
)
if client_id in app["clients"]:
client_id = gen_id(length + 1)
return client_id
room: str = data["room"] if "room" in data and data["room"] else gen_id()
async with sio.session(sid) as session:
session["room"] = room
if room in app["clients"]:
old_state: State = app["clients"][room]
if data["secret"] == old_state.secret:
logger.info("Got new client connection for %s", room)
old_state.sid = sid
old_state.config = Config(
sources=old_state.config.sources,
sources_prio=old_state.config.sources_prio,
**data["config"],
)
sio.enter_room(sid, room)
await sio.emit(
"client-registered", {"success": True, "room": room}, room=sid
)
await send_state(app["clients"][room], sid)
else:
logger.warning("Got wrong secret for %s", room)
await sio.emit(
"client-registered", {"success": False, "room": room}, room=sid
)
else:
logger.info("Registerd new client %s", room)
initial_entries = [Entry(**entry) for entry in data["queue"]]
initial_recent = [Entry(**entry) for entry in data["recent"]]
app["clients"][room] = State(
secret=data["secret"],
queue=Queue(initial_entries),
recent=initial_recent,
sid=sid,
config=Config(sources={}, sources_prio=[], **data["config"]),
)
sio.enter_room(sid, room)
await sio.emit(
"client-registered", {"success": True, "room": room}, room=sid
)
await send_state(app["clients"][room], sid)
@sio.on("sources")
async def handle_sources(sid: str, data: dict[str, Any]) -> None:
"""
Handle the "sources" message.
Get the list of sources the client wants to use. Update internal list of
sources, remove unused sources and query for a config for all uninitialized
sources by sending a "request-config" message for each such source to the
playback client. This will be handled by the
:py:func:`syng.client.request-config` function.
This will not yet add the sources to the configuration, rather gather what
sources need to be configured and request their configuration. The list
of sources will set the :py:attr:`Config.sources_prio` attribute.
:param sid: The session id of the playback client
:type sid: str
:param data: A dictionary containing a "sources" key, with the list of
sources to use.
:type data: dict[str, Any]
:rtype: None
"""
async with sio.session(sid) as session:
room = session["room"]
state = app["clients"][room]
if sid != state.sid:
return
unused_sources = state.config.sources.keys() - data["sources"]
new_sources = data["sources"] - state.config.sources.keys()
for source in unused_sources:
del state.config.sources[source]
state.config.sources_prio = data["sources"]
for name in new_sources:
await sio.emit("request-config", {"source": name}, room=sid)
@sio.on("config-chunk")
async def handle_config_chunk(sid: str, data: dict[str, Any]) -> None:
"""
Handle the "config-chunk" message.
This is called, when a source wants its configuration transmitted in
chunks, rather than a single message. If the source already exist
(e.g. when this is not the first chunk), the config will be added
to the source, otherwise a source will be created with the given
configuration.
:param sid: The session id of the playback client
:type sid: str
:param data: A dictionary with a "source" (str) and a
"config" (dict[str, Any]) entry. The exact content of the config entry
depends on the source.
:rtype: None
"""
async with sio.session(sid) as session:
room = session["room"]
state = app["clients"][room]
if sid != state.sid:
return
if not data["source"] in state.config.sources:
state.config.sources[data["source"]] = available_sources[
data["source"]
](data["config"])
else:
state.config.sources[data["source"]].add_to_config(data["config"])
@sio.on("config")
async def handle_config(sid: str, data: dict[str, Any]) -> None:
"""
Handle the "config" message.
This is called, when a source wants its configuration transmitted in
a single message, rather than chunks. A source will be created with the
given configuration.
:param sid: The session id of the playback client
:type sid: str
:param data: A dictionary with a "source" (str) and a
"config" (dict[str, Any]) entry. The exact content of the config entry
depends on the source.
:type data: dict[str, Any]
:rtype: None
"""
async with sio.session(sid) as session:
room = session["room"]
state = app["clients"][room]
if sid != state.sid:
return
state.config.sources[data["source"]] = available_sources[data["source"]](
data["config"]
)
@sio.on("register-web")
async def handle_register_web(sid: str, data: dict[str, Any]) -> bool:
"""
Handle a "register-web" message.
Adds a web client to a requested room and sends it the initial state of the
queue and recent list.
:param sid: The session id of the web client.
:type sid: str
:param data: A dictionary, containing at least a "room" entry.
:type data: dict[str, Any]
:returns: True, if the room exist, False otherwise
:rtype: bool
"""
if data["room"] in app["clients"]:
async with sio.session(sid) as session:
session["room"] = data["room"]
sio.enter_room(sid, session["room"])
state = app["clients"][session["room"]]
await send_state(state, sid)
return True
return False
@sio.on("register-admin")
async def handle_register_admin(sid: str, data: dict[str, Any]) -> bool:
"""
Handle a "register-admin" message.
If the client provides the correct secret for its room, the connection is
upgraded to an admin connection.
:param sid: The session id of the client, requesting admin.
:type sid: str:
:param data: A dictionary with at least a "secret" entry.
:type data: dict[str, Any]
:returns: True, if the secret is correct, False otherwise
:rtype: bool
"""
async with sio.session(sid) as session:
room = session["room"]
state = app["clients"][room]
is_admin: bool = data["secret"] == state.secret
async with sio.session(sid) as session:
session["admin"] = is_admin
return is_admin
@sio.on("skip-current")
async def handle_skip_current(sid: str) -> None:
"""
Handle a "skip-current" message.
If this comes from an admin connection, forward the "skip-current" message
to the playback client. This will be handled by the
:py:func:`syng.client.handle_skip_current` function.
:param sid: The session id of the client, requesting.
:type sid: str
:rtype: None
"""
async with sio.session(sid) as session:
room = session["room"]
is_admin = session["admin"]
state = app["clients"][room]
if is_admin:
old_entry = await state.queue.popleft()
state.recent.append(old_entry)
await sio.emit(
"skip-current", old_entry, room=app["clients"][room].sid
)
await send_state(state, room)
@sio.on("move-up")
async def handle_move_up(sid: str, data: dict[str, Any]) -> None:
"""
Handle the "move-up" message.
If on an admin connection, moves up the entry specified in the data by one
place in the queue.
:param sid: The session id of the client requesting.
:type sid: str
:param data: A dictionary with at least an "uuid" entry
:type data: dict[str, Any]
:rtype: None
"""
async with sio.session(sid) as session:
room = session["room"]
is_admin = session["admin"]
state = app["clients"][room]
if is_admin:
await state.queue.move_up(data["uuid"])
await send_state(state, room)
@sio.on("skip")
async def handle_skip(sid: str, data: dict[str, Any]) -> None:
"""
Handle the "skip" message.
If on an admin connection, removes the entry specified by data["uuid"]
from the queue.
:param sid: The session id of the client requesting.
:type sid: str
:param data: A dictionary with at least an "uuid" entry.
:type data: dict[str, Any]
:rtype: None
"""
async with sio.session(sid) as session:
room = session["room"]
is_admin = session["admin"]
state = app["clients"][room]
if is_admin:
entry = state.queue.find_by_uuid(data["uuid"])
if entry is not None:
logger.info("Skipping %s", entry)
await state.queue.remove(entry)
await send_state(state, room)
@sio.on("disconnect")
async def handle_disconnect(sid: str) -> None:
"""
Handle the "disconnect" message.
This message is send automatically, when a client disconnets.
Remove the client from its room.
:param sid: The session id of the client disconnecting
:type sid: str
:rtype: None
"""
async with sio.session(sid) as session:
if "room" in session:
sio.leave_room(sid, session["room"])
@sio.on("search")
async def handle_search(sid: str, data: dict[str, Any]) -> None:
"""
Handle the "search" message.
Forwards the dict["query"] to the :py:func:`Source.search` method, and
execute them concurrently. The order is given by the
:py:attr:`Config.sources_prio` attribute of the state.
The result will be send with a "search-results" message to the (web)
client.
:param sid: The session id of the client requesting.
:type sid: str
:param data: A dictionary with at least a "query" entry.
:type data: dict[str, str]
:rtype: None
"""
async with sio.session(sid) as session:
room = session["room"]
state = app["clients"][room]
query = data["query"]
results_list = await asyncio.gather(
*[
state.config.sources[source].search(query)
for source in state.config.sources_prio
]
)
results = [
search_result
for source_result in results_list
for search_result in source_result
]
await sio.emit(
"search-results",
{"results": results},
room=sid,
)
async def cleanup() -> None:
"""Clean up the unused playback clients
This runs every hour, and removes every client, that did not requested a song for four hours
"""
logger.info("Start Cleanup")
to_remove: list[str] = []
for sid, state in app["clients"].items():
logger.info("Client %s, last seen: %s", sid, str(state.last_seen))
if (
state.last_seen + datetime.timedelta(hours=4)
< datetime.datetime.now()
):
logger.info("No activity for 4 hours, removing %s", sid)
to_remove.append(sid)
for sid in to_remove:
await sio.disconnect(sid)
del app["clients"][sid]
logger.info("End Cleanup")
# The internal loop counter does not use a regular timestamp, so we need to convert between
# regular datetime and the async loop time
now = datetime.datetime.now()
# today = datetime.datetime(now.year, now.month, now.day)
# next_run = today + datetime.timedelta(days=1)
next_run = now + datetime.timedelta(hours=1)
offset = next_run.timestamp() - now.timestamp()
loop_next = asyncio.get_event_loop().time() + offset
logger.info("Next Cleanup at %s", str(next))
asyncio.get_event_loop().call_at(
loop_next, lambda: asyncio.create_task(cleanup())
)
async def background_tasks(
iapp: web.Application,
) -> AsyncGenerator[None, None]:
"""Create all the background tasks
For now, this is only the cleanup task
"""
iapp["repeated_cleanup"] = asyncio.create_task(cleanup())
yield
iapp["repeated_cleanup"].cancel()
await iapp["repeated_cleanup"]
def main() -> None:
"""
Configure and start the server.
Parse the command line arguments, register static routes to serve the web
client and start the server.
:rtype: None
"""
parser = ArgumentParser()
parser.add_argument("--host", "-H", default="localhost")
parser.add_argument("--port", "-p", default="8080")
args = parser.parse_args()
app.add_routes([web.static("/assets/", "syng/static/assets/")])
app.router.add_route("*", "/", root_handler)
app.router.add_route("*", "/{room}", root_handler)
app.router.add_route("*", "/{room}/", root_handler)
app["clients"] = {}
app.cleanup_ctx.append(background_tasks)
web.run_app(app, host=args.host, port=args.port)
if __name__ == "__main__":
main()