Moved away from this awful global client construction.

Client is now a class, that can be instantiated and contains its state.
This commit is contained in:
Christoph Stahl 2024-10-09 16:17:55 +02:00
parent f2e04ab95e
commit fb12bdedd8
2 changed files with 338 additions and 336 deletions

View file

@ -43,14 +43,6 @@ from .sources import configure_sources, Source
from .log import logger from .log import logger
sio: socketio.AsyncClient = socketio.AsyncClient(json=jsonencoder)
# logger: logging.Logger = logging"Syng"er(__name__)
sources: dict[str, Source] = {}
currentLock: asyncio.Semaphore = asyncio.Semaphore(0)
def default_config() -> dict[str, Optional[int | str]]: def default_config() -> dict[str, Optional[int | str]]:
""" """
Return a default configuration for the client. Return a default configuration for the client.
@ -119,374 +111,380 @@ class State:
config: dict[str, Any] = field(default_factory=default_config) config: dict[str, Any] = field(default_factory=default_config)
state: State = State() # state: State = State()
@sio.on("update_config") class Client:
async def handle_update_config(data: dict[str, Any]) -> None: def __init__(self, config: dict[str, Any]):
""" self.sio = socketio.AsyncClient(json=jsonencoder)
Handle the "update_config" message. self.config = config
self.sources = configure_sources(config["sources"])
self.state = State()
self.currentLock = asyncio.Semaphore(0)
self.register_handlers()
Currently, this function is untested and should be considered dangerous. def register_handlers(self) -> None:
self.sio.on("update_config", self.handle_update_config)
self.sio.on("skip-current", self.handle_skip_current)
self.sio.on("state", self.handle_state)
self.sio.on("connect", self.handle_connect)
self.sio.on("get-meta-info", self.handle_get_meta_info)
self.sio.on("play", self.handle_play)
self.sio.on("search", self.handle_search)
self.sio.on("client-registered", self.handle_client_registered)
self.sio.on("request-config", self.handle_request_config)
:param data: A dictionary with the new configuration. async def handle_update_config(self, data: dict[str, Any]) -> None:
:type data: dict[str, Any] """
:rtype: None Handle the "update_config" message.
"""
state.config = default_config() | data
Currently, this function is untested and should be considered dangerous.
@sio.on("skip-current") :param data: A dictionary with the new configuration.
async def handle_skip_current(data: dict[str, Any]) -> None: :type data: dict[str, Any]
""" :rtype: None
Handle the "skip-current" message. """
self.state.config = default_config() | data
Skips the song, that is currently played. If playback currently waits for async def handle_skip_current(self, data: dict[str, Any]) -> None:
buffering, the buffering is also aborted. """
Handle the "skip-current" message.
Since the ``queue`` could already be updated, when this evaluates, the Skips the song, that is currently played. If playback currently waits for
first entry in the queue is send explicitly. buffering, the buffering is also aborted.
:param data: An entry, that should be equivalent to the first entry of the Since the ``queue`` could already be updated, when this evaluates, the
queue. first entry in the queue is send explicitly.
:rtype: None
"""
logger.info("Skipping current")
if state.current_source is not None:
await state.current_source.skip_current(Entry(**data))
:param data: An entry, that should be equivalent to the first entry of the
queue.
:rtype: None
"""
logger.info("Skipping current")
if self.state.current_source is not None:
await self.state.current_source.skip_current(Entry(**data))
@sio.on("state") async def handle_state(self, data: dict[str, Any]) -> None:
async def handle_state(data: dict[str, Any]) -> None: """
""" Handle the "state" message.
Handle the "state" message.
The "state" message forwards the current queue and recent list from the The "state" message forwards the current queue and recent list from the
server. This function saves a copy of both in the global server. This function saves a copy of both in the global
:py:class:`State`:. :py:class:`State`:.
After recieving the new state, a buffering task for the first elements of After recieving the new state, a buffering task for the first elements of
the queue is started. the queue is started.
:param data: A dictionary with the `queue` and `recent` list. :param data: A dictionary with the `queue` and `recent` list.
:type data: dict[str, Any] :type data: dict[str, Any]
:rtype: None :rtype: None
""" """
state.queue = [Entry(**entry) for entry in data["queue"]] self.state.queue = [Entry(**entry) for entry in data["queue"]]
state.waiting_room = [Entry(**entry) for entry in data["waiting_room"]] self.state.waiting_room = [Entry(**entry) for entry in data["waiting_room"]]
state.recent = [Entry(**entry) for entry in data["recent"]] self.state.recent = [Entry(**entry) for entry in data["recent"]]
for entry in state.queue[:2]: for entry in self.state.queue[:2]:
logger.info("Buffering: %s", entry.title) logger.info("Buffering: %s", entry.title)
await sources[entry.source].buffer(entry) await self.sources[entry.source].buffer(entry)
async def handle_connect(self) -> None:
"""
Handle the "connect" message.
@sio.on("connect") Called when the client successfully connects or reconnects to the server.
async def handle_connect() -> None: Sends a `register-client` message to the server with the initial state and
""" configuration of the client, consiting of the currently saved
Handle the "connect" message. :py:attr:`State.queue` and :py:attr:`State.recent` field of the global
:py:class:`State`, as well a room code the client wants to connect to, a
secret to secure the access to the room and a config dictionary.
Called when the client successfully connects or reconnects to the server. If the room code is `None`, the server will issue a room code.
Sends a `register-client` message to the server with the initial state and
configuration of the client, consiting of the currently saved
:py:attr:`State.queue` and :py:attr:`State.recent` field of the global
:py:class:`State`, as well a room code the client wants to connect to, a
secret to secure the access to the room and a config dictionary.
If the room code is `None`, the server will issue a room code. This message will be handled by the
:py:func:`syng.server.handle_register_client` function of the server.
This message will be handled by the :rtype: None
:py:func:`syng.server.handle_register_client` function of the server. """
logger.info("Connected to server")
data = {
"queue": self.state.queue,
"waiting_room": self.state.waiting_room,
"recent": self.state.recent,
"config": self.state.config,
}
await self.sio.emit("register-client", data)
:rtype: None async def handle_get_meta_info(self, data: dict[str, Any]) -> None:
""" """
logger.info("Connected to server") Handle a "get-meta-info" message.
data = {
"queue": state.queue,
"waiting_room": state.waiting_room,
"recent": state.recent,
"config": state.config,
}
await sio.emit("register-client", data)
Collects the metadata for a given :py:class:`Entry`, from its source, and
sends them back to the server in a "meta-info" message. On the server side
a :py:func:`syng.server.handle_meta_info` function is called.
@sio.on("get-meta-info") :param data: A dictionary encoding the entry
async def handle_get_meta_info(data: dict[str, Any]) -> None: :type data: dict[str, Any]
""" :rtype: None
Handle a "get-meta-info" message. """
source: Source = self.sources[data["source"]]
meta_info: dict[str, Any] = await source.get_missing_metadata(Entry(**data))
await self.sio.emit("meta-info", {"uuid": data["uuid"], "meta": meta_info})
Collects the metadata for a given :py:class:`Entry`, from its source, and async def preview(self, entry: Entry) -> None:
sends them back to the server in a "meta-info" message. On the server side """
a :py:func:`syng.server.handle_meta_info` function is called. Generate and play a preview for a given :py:class:`Entry`.
:param data: A dictionary encoding the entry This function shows a black screen and prints the artist, title and
:type data: dict[str, Any] performer of the entry for a duration.
:rtype: None
"""
source: Source = sources[data["source"]]
meta_info: dict[str, Any] = await source.get_missing_metadata(Entry(**data))
await sio.emit("meta-info", {"uuid": data["uuid"], "meta": meta_info})
This is done by creating a black png file, and showing subtitles in the
middle of the screen.... don't ask, it works
async def preview(entry: Entry) -> None: :param entry: The entry to preview
""" :type entry: :py:class:`Entry`
Generate and play a preview for a given :py:class:`Entry`. :rtype: None
"""
This function shows a black screen and prints the artist, title and background = Image.new("RGB", (1280, 720))
performer of the entry for a duration. subtitle: str = f"""1
This is done by creating a black png file, and showing subtitles in the
middle of the screen.... don't ask, it works
:param entry: The entry to preview
:type entry: :py:class:`Entry`
:rtype: None
"""
background = Image.new("RGB", (1280, 720))
subtitle: str = f"""1
00:00:00,00 --> 00:05:00,00 00:00:00,00 --> 00:05:00,00
{entry.artist} - {entry.title} {entry.artist} - {entry.title}
{entry.performer}""" {entry.performer}"""
with tempfile.NamedTemporaryFile() as tmpfile: with tempfile.NamedTemporaryFile() as tmpfile:
background.save(tmpfile, "png") background.save(tmpfile, "png")
process = await asyncio.create_subprocess_exec( process = await asyncio.create_subprocess_exec(
"mpv", "mpv",
tmpfile.name, tmpfile.name,
f"--image-display-duration={state.config['preview_duration']}", f"--image-display-duration={self.state.config['preview_duration']}",
"--sub-pos=50", "--sub-pos=50",
"--sub-file=-", "--sub-file=-",
"--fullscreen", "--fullscreen",
state.config["mpv_options"], self.state.config["mpv_options"],
stdin=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE, stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
)
await process.communicate(subtitle.encode())
async def handle_play(self, data: dict[str, Any]) -> None:
"""
Handle the "play" message.
Plays the :py:class:`Entry`, that is encoded in the `data` parameter. If a
:py:attr:`State.preview_duration` is set, it shows a small preview before
that.
When the playback is done, the next song is requested from the server with
a "pop-then-get-next" message. This is handled by the
:py:func:`syng.server.handle_pop_then_get_next` function on the server.
If the entry is marked as skipped, emit a "get-first" message instead,
because the server already handled the removal of the first entry.
:param data: A dictionary encoding the entry
:type data: dict[str, Any]
:rtype: None
"""
entry: Entry = Entry(**data)
print(
f"Playing: {entry.artist} - {entry.title} [{entry.album}] "
f"({entry.source}) for {entry.performer}"
) )
await process.communicate(subtitle.encode()) try:
self.state.current_source = self.sources[entry.source]
if self.state.config["preview_duration"] > 0:
@sio.on("play") await self.preview(entry)
async def handle_play(data: dict[str, Any]) -> None: await self.sources[entry.source].play(entry, self.state.config["mpv_options"])
""" except Exception: # pylint: disable=broad-except
Handle the "play" message. print_exc()
self.state.current_source = None
Plays the :py:class:`Entry`, that is encoded in the `data` parameter. If a if entry.skip:
:py:attr:`State.preview_duration` is set, it shows a small preview before await self.sio.emit("get-first")
that.
When the playback is done, the next song is requested from the server with
a "pop-then-get-next" message. This is handled by the
:py:func:`syng.server.handle_pop_then_get_next` function on the server.
If the entry is marked as skipped, emit a "get-first" message instead,
because the server already handled the removal of the first entry.
:param data: A dictionary encoding the entry
:type data: dict[str, Any]
:rtype: None
"""
entry: Entry = Entry(**data)
print(
f"Playing: {entry.artist} - {entry.title} [{entry.album}] "
f"({entry.source}) for {entry.performer}"
)
try:
state.current_source = sources[entry.source]
if state.config["preview_duration"] > 0:
await preview(entry)
await sources[entry.source].play(entry, state.config["mpv_options"])
except Exception: # pylint: disable=broad-except
print_exc()
state.current_source = None
if entry.skip:
await sio.emit("get-first")
else:
await sio.emit("pop-then-get-next")
@sio.on("search")
async def handle_search(data: dict[str, Any]) -> None:
"""
Handle the "search" message.
This handles client side search requests. It sends a search request to all
configured :py:class:`syng.sources.source.Source` and collects the results.
The results are then send back to the server in a "search-results" message,
including the `sid` of the corresponding webclient.
:param data: A dictionary with the `query` and `sid` entry.
:type data: dict[str, Any]
:rtype: None
"""
logger.info(f"Searching for: {data['query']}")
query = data["query"]
sid = data["sid"]
results_list = await asyncio.gather(*[source.search(query) for source in sources.values()])
results = [
search_result.to_dict() for source_result in results_list for search_result in source_result
]
await sio.emit("search-results", {"results": results, "sid": sid})
@sio.on("client-registered")
async def handle_client_registered(data: dict[str, Any]) -> None:
"""
Handle the "client-registered" message.
If the registration was successfull (`data["success"]` == `True`), store
the room code in the global :py:class:`State` and print out a link to join
the webclient.
Start listing all configured :py:class:`syng.sources.source.Source` to the
server via a "sources" message. This message will be handled by the
:py:func:`syng.server.handle_sources` function and may request additional
configuration for each source.
If there is no song playing, start requesting the first song of the queue
with a "get-first" message. This will be handled on the server by the
:py:func:`syng.server.handle_get_first` function.
:param data: A dictionary containing a `success` and a `room` entry.
:type data: dict[str, Any]
:rtype: None
"""
if data["success"]:
logger.info("Registered")
# this is borked on windows
if os.name != "nt":
print(f"Join here: {state.config['server']}/{data['room']}")
qr = QRCode(box_size=20, border=2)
qr.add_data(f"{state.config['server']}/{data['room']}")
qr.make()
qr.print_ascii()
state.config["room"] = data["room"]
await sio.emit("sources", {"sources": list(sources.keys())})
if state.current_source is None: # A possible race condition can occur here
await sio.emit("get-first")
else:
logger.warning("Registration failed")
await sio.disconnect()
@sio.on("request-config")
async def handle_request_config(data: dict[str, Any]) -> None:
"""
Handle the "request-config" message.
Sends the specific server side configuration for a given
:py:class:`syng.sources.source.Source`.
A Source can decide, that the config will be split up in multiple Parts.
If this is the case, multiple "config-chunk" messages will be send with a
running enumerator. Otherwise a single "config" message will be send.
After the configuration is send, the source is asked to update its
configuration. This can also be split up in multiple parts.
:param data: A dictionary with the entry `source` and a string, that
corresponds to the name of a source.
:type data: dict[str, Any]
:rtype: None
"""
if data["source"] in sources:
config: dict[str, Any] | list[dict[str, Any]] = await sources[data["source"]].get_config()
if isinstance(config, list):
num_chunks: int = len(config)
for current, chunk in enumerate(config):
await sio.emit(
"config-chunk",
{
"source": data["source"],
"config": chunk,
"number": current,
"total": num_chunks,
},
)
else: else:
await sio.emit("config", {"source": data["source"], "config": config}) await self.sio.emit("pop-then-get-next")
updated_config = await sources[data["source"]].update_config() async def handle_search(self, data: dict[str, Any]) -> None:
if isinstance(updated_config, list): """
num_chunks = len(updated_config) Handle the "search" message.
for current, chunk in enumerate(updated_config):
await sio.emit(
"config-chunk",
{
"source": data["source"],
"config": chunk,
"number": current,
"total": num_chunks,
},
)
elif updated_config is not None:
await sio.emit("config", {"source": data["source"], "config": updated_config})
This handles client side search requests. It sends a search request to all
configured :py:class:`syng.sources.source.Source` and collects the results.
def signal_handler() -> None: The results are then send back to the server in a "search-results" message,
""" including the `sid` of the corresponding webclient.
Signal handler for the client.
This function is called when the client receives a signal to terminate. It :param data: A dictionary with the `query` and `sid` entry.
will disconnect from the server and kill the current player. :type data: dict[str, Any]
:rtype: None
:rtype: None """
""" logger.info(f"Searching for: {data['query']}")
engineio.async_client.async_signal_handler() query = data["query"]
if state.current_source is not None: sid = data["sid"]
if state.current_source.player is not None: results_list = await asyncio.gather(
state.current_source.player.kill() *[source.search(query) for source in self.sources.values()]
async def start_client(config: dict[str, Any]) -> None:
"""
Initialize the client and connect to the server.
:param config: Config options for the client
:type config: dict[str, Any]
:rtype: None
"""
sources.update(configure_sources(config["sources"]))
if "config" in config:
last_song = (
datetime.datetime.fromisoformat(config["config"]["last_song"]).timestamp()
if "last_song" in config["config"] and config["config"]["last_song"]
else None
) )
state.config |= config["config"] | {"last_song": last_song}
if not ("secret" in state.config and state.config["secret"]): results = [
state.config["secret"] = "".join( search_result.to_dict()
secrets.choice(string.ascii_letters + string.digits) for _ in range(8) for source_result in results_list
) for search_result in source_result
print(f"Generated secret: {state.config['secret']}") ]
if not ("key" in state.config and state.config["key"]): await self.sio.emit("search-results", {"results": results, "sid": sid})
state.config["key"] = ""
try: async def handle_client_registered(self, data: dict[str, Any]) -> None:
await sio.connect(state.config["server"]) """
except ConnectionError: Handle the "client-registered" message.
logger.error("Could not connect to server")
return
# this is not supported under windows If the registration was successfull (`data["success"]` == `True`), store
if os.name != "nt": the room code in the global :py:class:`State` and print out a link to join
asyncio.get_event_loop().add_signal_handler(signal.SIGINT, signal_handler) the webclient.
asyncio.get_event_loop().add_signal_handler(signal.SIGTERM, signal_handler)
try: Start listing all configured :py:class:`syng.sources.source.Source` to the
await sio.wait() server via a "sources" message. This message will be handled by the
except asyncio.CancelledError: :py:func:`syng.server.handle_sources` function and may request additional
pass configuration for each source.
finally:
if state.current_source is not None: If there is no song playing, start requesting the first song of the queue
if state.current_source.player is not None: with a "get-first" message. This will be handled on the server by the
state.current_source.player.kill() :py:func:`syng.server.handle_get_first` function.
:param data: A dictionary containing a `success` and a `room` entry.
:type data: dict[str, Any]
:rtype: None
"""
if data["success"]:
logger.info("Registered")
# this is borked on windows
if os.name != "nt":
print(f"Join here: {self.state.config['server']}/{data['room']}")
qr = QRCode(box_size=20, border=2)
qr.add_data(f"{self.state.config['server']}/{data['room']}")
qr.make()
qr.print_ascii()
self.state.config["room"] = data["room"]
await self.sio.emit("sources", {"sources": list(self.sources.keys())})
if self.state.current_source is None: # A possible race condition can occur here
await self.sio.emit("get-first")
else:
logger.warning("Registration failed")
await self.sio.disconnect()
async def handle_request_config(self, data: dict[str, Any]) -> None:
"""
Handle the "request-config" message.
Sends the specific server side configuration for a given
:py:class:`syng.sources.source.Source`.
A Source can decide, that the config will be split up in multiple Parts.
If this is the case, multiple "config-chunk" messages will be send with a
running enumerator. Otherwise a single "config" message will be send.
After the configuration is send, the source is asked to update its
configuration. This can also be split up in multiple parts.
:param data: A dictionary with the entry `source` and a string, that
corresponds to the name of a source.
:type data: dict[str, Any]
:rtype: None
"""
if data["source"] in self.sources:
config: dict[str, Any] | list[dict[str, Any]] = await self.sources[
data["source"]
].get_config()
if isinstance(config, list):
num_chunks: int = len(config)
for current, chunk in enumerate(config):
await self.sio.emit(
"config-chunk",
{
"source": data["source"],
"config": chunk,
"number": current,
"total": num_chunks,
},
)
else:
await self.sio.emit("config", {"source": data["source"], "config": config})
updated_config = await self.sources[data["source"]].update_config()
if isinstance(updated_config, list):
num_chunks = len(updated_config)
for current, chunk in enumerate(updated_config):
await self.sio.emit(
"config-chunk",
{
"source": data["source"],
"config": chunk,
"number": current,
"total": num_chunks,
},
)
elif updated_config is not None:
await self.sio.emit("config", {"source": data["source"], "config": updated_config})
def signal_handler(self) -> None:
"""
Signal handler for the client.
This function is called when the client receives a signal to terminate. It
will disconnect from the server and kill the current player.
:rtype: None
"""
engineio.async_client.async_signal_handler()
if self.state.current_source is not None:
if self.state.current_source.player is not None:
self.state.current_source.player.kill()
async def start_client(self, config: dict[str, Any]) -> None:
"""
Initialize the client and connect to the server.
:param config: Config options for the client
:type config: dict[str, Any]
:rtype: None
"""
self.sources.update(configure_sources(config["sources"]))
if "config" in config:
last_song = (
datetime.datetime.fromisoformat(config["config"]["last_song"]).timestamp()
if "last_song" in config["config"] and config["config"]["last_song"]
else None
)
self.state.config |= config["config"] | {"last_song": last_song}
if not ("secret" in self.state.config and self.state.config["secret"]):
self.state.config["secret"] = "".join(
secrets.choice(string.ascii_letters + string.digits) for _ in range(8)
)
print(f"Generated secret: {self.state.config['secret']}")
if not ("key" in self.state.config and self.state.config["key"]):
self.state.config["key"] = ""
try:
await self.sio.connect(self.state.config["server"])
except ConnectionError:
logger.error("Could not connect to server")
return
# this is not supported under windows
if os.name != "nt":
asyncio.get_event_loop().add_signal_handler(signal.SIGINT, self.signal_handler)
asyncio.get_event_loop().add_signal_handler(signal.SIGTERM, self.signal_handler)
try:
await self.sio.wait()
except asyncio.CancelledError:
pass
finally:
if self.state.current_source is not None:
if self.state.current_source.player is not None:
self.state.current_source.player.kill()
def create_async_and_start_client( def create_async_and_start_client(
@ -507,7 +505,9 @@ def create_async_and_start_client(
if queue is not None: if queue is not None:
logger.addHandler(QueueHandler(queue)) logger.addHandler(QueueHandler(queue))
asyncio.run(start_client(config)) client = Client(config)
asyncio.run(client.start_client(config))
def run_client(args: Namespace) -> None: def run_client(args: Namespace) -> None:

View file

@ -34,7 +34,9 @@ class AsyncServer:
class AsyncClient: class AsyncClient:
def __init__(self, json: Any = None): ... def __init__(self, json: Any = None): ...
def on(self, event: str) -> Callable[[ClientHandler], ClientHandler]: ... def on(
self, event: str, handler: Optional[Callable[..., Any]] = None
) -> Callable[[ClientHandler], ClientHandler]: ...
async def wait(self) -> None: ... async def wait(self) -> None: ...
async def connect(self, server: str) -> None: ... async def connect(self, server: str) -> None: ...
async def disconnect(self) -> None: ... async def disconnect(self) -> None: ...