Update index files in the background

This commit is contained in:
Christoph Stahl 2024-10-06 14:19:05 +02:00
parent e4155140f1
commit ee85aaa46a
3 changed files with 112 additions and 14 deletions

View file

@ -381,7 +381,10 @@ async def handle_request_config(data: dict[str, Any]) -> None:
A Source can decide, that the config will be split up in multiple Parts.
If this is the case, multiple "config-chunk" messages will be send with a
running enumerator. Otherwise a singe "config" message will be send.
running enumerator. Otherwise a single "config" message will be send.
After the configuration is send, the source is asked to update its
configuration. This can also be split up in multiple parts.
:param data: A dictionary with the entry `source` and a string, that
corresponds to the name of a source.
@ -405,6 +408,22 @@ async def handle_request_config(data: dict[str, Any]) -> None:
else:
await sio.emit("config", {"source": data["source"], "config": config})
updated_config = await sources[data["source"]].update_config()
if isinstance(updated_config, list):
num_chunks: int = len(updated_config)
for current, chunk in enumerate(updated_config):
await sio.emit(
"config-chunk",
{
"source": data["source"],
"config": chunk,
"number": current + 1,
"total": num_chunks,
},
)
elif updated_config is not None:
await sio.emit("config", {"source": data["source"], "config": updated_config})
def signal_handler() -> None:
"""

View file

@ -5,6 +5,7 @@ Adds it to the ``available_sources`` with the name ``s3``
"""
import asyncio
from itertools import zip_longest
import os
from json import dump, load
from typing import TYPE_CHECKING, Any, Optional, Tuple, cast
@ -79,6 +80,32 @@ class S3Source(FileBasedSource):
self.index_file: Optional[str] = config["index_file"] if "index_file" in config else None
self.extra_mpv_arguments = ["--scale=oversample"]
def load_file_list_from_server(self) -> list[str]:
"""
Load the file list from the s3 instance.
:return: A list of file paths
:rtype: list[str]
"""
file_list = [
obj.object_name
for obj in self.minio.list_objects(self.bucket, recursive=True)
if obj.object_name is not None and self.has_correct_extension(obj.object_name)
]
return file_list
def write_index(self, file_list: list[str]) -> None:
if self.index_file is None:
return
index_dir = os.path.dirname(self.index_file)
if index_dir:
os.makedirs(os.path.dirname(self.index_file), exist_ok=True)
with open(self.index_file, "w", encoding="utf8") as index_file_handle:
dump(file_list, index_file_handle)
async def get_file_list(self) -> list[str]:
"""
Return the list of files on the s3 instance, according to the extensions.
@ -96,22 +123,29 @@ class S3Source(FileBasedSource):
with open(self.index_file, "r", encoding="utf8") as index_file_handle:
return cast(list[str], load(index_file_handle))
file_list = [
obj.object_name
for obj in self.minio.list_objects(self.bucket, recursive=True)
if obj.object_name is not None and self.has_correct_extension(obj.object_name)
]
file_list = self.load_file_list_from_server()
if self.index_file is not None and not os.path.isfile(self.index_file):
index_dir = os.path.dirname(self.index_file)
if index_dir:
os.makedirs(os.path.dirname(self.index_file), exist_ok=True)
self.write_index(file_list)
with open(self.index_file, "w", encoding="utf8") as index_file_handle:
dump(file_list, index_file_handle)
return file_list
return await asyncio.to_thread(_get_file_list)
async def update_file_list(self) -> Optional[list[str]]:
"""
Rescan the file list and update the index file.
:return: The updated file list
:rtype: list[str]
"""
def _update_file_list() -> list[str]:
file_list = self.load_file_list_from_server()
self.write_index(file_list)
return file_list
return await asyncio.to_thread(_update_file_list)
async def get_missing_metadata(self, entry: Entry) -> dict[str, Any]:
"""
Return the duration for the music file.

View file

@ -385,6 +385,45 @@ class Source(ABC):
"""
return []
async def update_file_list(self) -> Optional[list[str]]:
"""
Update the internal list of files.
This is called after the client sends its initial file list to the
server to update the list of files since the last time an index file
was written.
It should return None, if the list is already up to date.
Otherwise it should return the new list of files.
:rtype: Optional[list[str]]
"""
return None
async def update_config(self) -> Optional[dict[str, Any] | list[dict[str, Any]]]:
"""
Update the config of the source.
This is called after the client sends its initial config to the server to
update the config. E.g. to update the list of files, that should be send to
the server.
It returns None, if the config is already up to date.
Otherwise returns the new config.
:rtype: Optional[dict[str, Any] | list[dict[str, Any]]
"""
logger.warning(f"{self.source_name}: updating index")
new_index = await self.update_file_list()
logger.warning(f"{self.source_name}: done")
if new_index is not None:
self._index = new_index
chunked = zip_longest(*[iter(new_index)] * 1000, fillvalue="")
return [{"index": list(filter(lambda x: x != "", chunk))} for chunk in chunked]
return None
async def get_config(self) -> dict[str, Any] | list[dict[str, Any]]:
"""
Return the part of the config, that should be send to the server.
@ -405,13 +444,13 @@ class Source(ABC):
"""
if not self._index:
self._index = []
logger.warn(f"{self.source_name}: generating index")
logger.warning(f"{self.source_name}: generating index")
self._index = await self.get_file_list()
logger.warn(f"{self.source_name}: done")
logger.warning(f"{self.source_name}: done")
chunked = zip_longest(*[iter(self._index)] * 1000, fillvalue="")
return [{"index": list(filter(lambda x: x != "", chunk))} for chunk in chunked]
def add_to_config(self, config: dict[str, Any]) -> None:
def add_to_config(self, config: dict[str, Any], running_number: int) -> None:
"""
Add the config to the own config.
@ -421,10 +460,16 @@ class Source(ABC):
In the default configuration, this just adds the index key of the
config to the index attribute of the source
If the running_number is 0, the index will be reset.
:param config: The part of the config to add.
:type config: dict[str, Any]
:param running_number: The running number of the config
:type running_number: int
:rtype: None
"""
if running_number == 0:
self._index = []
self._index += config["index"]