From ee85aaa46ad7e4c0a1c718868ea01c82d43a9e13 Mon Sep 17 00:00:00 2001 From: Christoph Stahl Date: Sun, 6 Oct 2024 14:19:05 +0200 Subject: [PATCH] Update index files in the background --- syng/client.py | 21 +++++++++++++++- syng/sources/s3.py | 54 ++++++++++++++++++++++++++++++++++-------- syng/sources/source.py | 51 ++++++++++++++++++++++++++++++++++++--- 3 files changed, 112 insertions(+), 14 deletions(-) diff --git a/syng/client.py b/syng/client.py index 97a5511..42de18f 100644 --- a/syng/client.py +++ b/syng/client.py @@ -381,7 +381,10 @@ async def handle_request_config(data: dict[str, Any]) -> None: A Source can decide, that the config will be split up in multiple Parts. If this is the case, multiple "config-chunk" messages will be send with a - running enumerator. Otherwise a singe "config" message will be send. + running enumerator. Otherwise a single "config" message will be send. + + After the configuration is send, the source is asked to update its + configuration. This can also be split up in multiple parts. :param data: A dictionary with the entry `source` and a string, that corresponds to the name of a source. @@ -405,6 +408,22 @@ async def handle_request_config(data: dict[str, Any]) -> None: else: await sio.emit("config", {"source": data["source"], "config": config}) + updated_config = await sources[data["source"]].update_config() + if isinstance(updated_config, list): + num_chunks: int = len(updated_config) + for current, chunk in enumerate(updated_config): + await sio.emit( + "config-chunk", + { + "source": data["source"], + "config": chunk, + "number": current + 1, + "total": num_chunks, + }, + ) + elif updated_config is not None: + await sio.emit("config", {"source": data["source"], "config": updated_config}) + def signal_handler() -> None: """ diff --git a/syng/sources/s3.py b/syng/sources/s3.py index f2ce5e2..89c5069 100644 --- a/syng/sources/s3.py +++ b/syng/sources/s3.py @@ -5,6 +5,7 @@ Adds it to the ``available_sources`` with the name ``s3`` """ import asyncio +from itertools import zip_longest import os from json import dump, load from typing import TYPE_CHECKING, Any, Optional, Tuple, cast @@ -79,6 +80,32 @@ class S3Source(FileBasedSource): self.index_file: Optional[str] = config["index_file"] if "index_file" in config else None self.extra_mpv_arguments = ["--scale=oversample"] + def load_file_list_from_server(self) -> list[str]: + """ + Load the file list from the s3 instance. + + :return: A list of file paths + :rtype: list[str] + """ + + file_list = [ + obj.object_name + for obj in self.minio.list_objects(self.bucket, recursive=True) + if obj.object_name is not None and self.has_correct_extension(obj.object_name) + ] + return file_list + + def write_index(self, file_list: list[str]) -> None: + if self.index_file is None: + return + + index_dir = os.path.dirname(self.index_file) + if index_dir: + os.makedirs(os.path.dirname(self.index_file), exist_ok=True) + + with open(self.index_file, "w", encoding="utf8") as index_file_handle: + dump(file_list, index_file_handle) + async def get_file_list(self) -> list[str]: """ Return the list of files on the s3 instance, according to the extensions. @@ -96,22 +123,29 @@ class S3Source(FileBasedSource): with open(self.index_file, "r", encoding="utf8") as index_file_handle: return cast(list[str], load(index_file_handle)) - file_list = [ - obj.object_name - for obj in self.minio.list_objects(self.bucket, recursive=True) - if obj.object_name is not None and self.has_correct_extension(obj.object_name) - ] + file_list = self.load_file_list_from_server() if self.index_file is not None and not os.path.isfile(self.index_file): - index_dir = os.path.dirname(self.index_file) - if index_dir: - os.makedirs(os.path.dirname(self.index_file), exist_ok=True) + self.write_index(file_list) - with open(self.index_file, "w", encoding="utf8") as index_file_handle: - dump(file_list, index_file_handle) return file_list return await asyncio.to_thread(_get_file_list) + async def update_file_list(self) -> Optional[list[str]]: + """ + Rescan the file list and update the index file. + + :return: The updated file list + :rtype: list[str] + """ + + def _update_file_list() -> list[str]: + file_list = self.load_file_list_from_server() + self.write_index(file_list) + return file_list + + return await asyncio.to_thread(_update_file_list) + async def get_missing_metadata(self, entry: Entry) -> dict[str, Any]: """ Return the duration for the music file. diff --git a/syng/sources/source.py b/syng/sources/source.py index 570b9eb..ffa11f3 100644 --- a/syng/sources/source.py +++ b/syng/sources/source.py @@ -385,6 +385,45 @@ class Source(ABC): """ return [] + async def update_file_list(self) -> Optional[list[str]]: + """ + Update the internal list of files. + + This is called after the client sends its initial file list to the + server to update the list of files since the last time an index file + was written. + + It should return None, if the list is already up to date. + Otherwise it should return the new list of files. + + + :rtype: Optional[list[str]] + """ + return None + + async def update_config(self) -> Optional[dict[str, Any] | list[dict[str, Any]]]: + """ + Update the config of the source. + + This is called after the client sends its initial config to the server to + update the config. E.g. to update the list of files, that should be send to + the server. + + It returns None, if the config is already up to date. + Otherwise returns the new config. + + :rtype: Optional[dict[str, Any] | list[dict[str, Any]] + """ + + logger.warning(f"{self.source_name}: updating index") + new_index = await self.update_file_list() + logger.warning(f"{self.source_name}: done") + if new_index is not None: + self._index = new_index + chunked = zip_longest(*[iter(new_index)] * 1000, fillvalue="") + return [{"index": list(filter(lambda x: x != "", chunk))} for chunk in chunked] + return None + async def get_config(self) -> dict[str, Any] | list[dict[str, Any]]: """ Return the part of the config, that should be send to the server. @@ -405,13 +444,13 @@ class Source(ABC): """ if not self._index: self._index = [] - logger.warn(f"{self.source_name}: generating index") + logger.warning(f"{self.source_name}: generating index") self._index = await self.get_file_list() - logger.warn(f"{self.source_name}: done") + logger.warning(f"{self.source_name}: done") chunked = zip_longest(*[iter(self._index)] * 1000, fillvalue="") return [{"index": list(filter(lambda x: x != "", chunk))} for chunk in chunked] - def add_to_config(self, config: dict[str, Any]) -> None: + def add_to_config(self, config: dict[str, Any], running_number: int) -> None: """ Add the config to the own config. @@ -421,10 +460,16 @@ class Source(ABC): In the default configuration, this just adds the index key of the config to the index attribute of the source + If the running_number is 0, the index will be reset. + :param config: The part of the config to add. :type config: dict[str, Any] + :param running_number: The running number of the config + :type running_number: int :rtype: None """ + if running_number == 0: + self._index = [] self._index += config["index"]