From 0dbac0aeee86783fe703bcf072f304e8361dd4a9 Mon Sep 17 00:00:00 2001 From: Christoph Stahl Date: Sun, 11 Dec 2022 17:31:39 +0100 Subject: [PATCH] Unified files and s3 source --- syng/server.py | 3 ++ syng/sources/files.py | 102 ++++++----------------------------- syng/sources/s3.py | 115 ++++++++-------------------------------- syng/sources/source.py | 106 ++++++++++++++++++++++++++++++------ syng/sources/youtube.py | 2 + 5 files changed, 134 insertions(+), 194 deletions(-) diff --git a/syng/server.py b/syng/server.py index 7b99847..2d8a36e 100644 --- a/syng/server.py +++ b/syng/server.py @@ -197,6 +197,9 @@ async def handle_append(sid: str, data: dict[str, Any]) -> None: source_obj = state.config.sources[data["source"]] entry = await source_obj.get_entry(data["performer"], data["ident"]) + if entry is None: + await sio.emit("mst", {"msg": f"Unable to append {data['ident']}"}) + return first_song = state.queue.try_peek() if first_song is None or first_song.started_at is None: diff --git a/syng/sources/files.py b/syng/sources/files.py index 9cbb476..770dd2f 100644 --- a/syng/sources/files.py +++ b/syng/sources/files.py @@ -1,7 +1,6 @@ -"""Module for the files Source""" +"""Module for the files Source.""" import asyncio import os -from itertools import zip_longest from typing import Any from typing import Optional from typing import Tuple @@ -17,80 +16,32 @@ from .source import Source class FilesSource(Source): """A source for indexing and playing songs from a local folder. - Config options are: -``dir``, dirctory to index and server from. """ def __init__(self, config: dict[str, Any]): + """Initialize the file module.""" super().__init__(config) + self.source_name = "files" + self.dir = config["dir"] if "dir" in config else "." - self.index: list[str] = config["index"] if "index" in config else [] self.extra_mpv_arguments = ["--scale=oversample"] - async def get_entry(self, performer: str, ident: str) -> Entry: - """ - Extract the information for an Entry from the file name. + async def get_file_list(self) -> list[str]: + """Collect all ``cdg`` files in ``dir``.""" - Since the server does not have access to the actual file, only to the - file name, ``duration`` can not be set. It will be approximated with - 180 seconds. When added to the queue, the server will ask the client - for additional metadata, like this. + def _get_file_list() -> list[str]: + file_list = [] + for path, dir, files in os.walk(self.dir): + for file in files: + if file.endswith(".cdg"): + file_list.append( + os.path.join(path, file)[len(self.dir):] + ) + return file_list - :param performer: The persong singing. - :type performer: str - :param ident: A path to a ``cdg`` file. - :type ident: str - :return: An entry with the data. - :rtype: Entry - """ - res: Optional[Result] = Result.from_filename(ident, "files") - if res is not None: - return Entry( - ident=ident, - source="files", - duration=180, - album=res.album, - title=res.title, - artist=res.artist, - performer=performer, - ) - raise RuntimeError(f"Could not parse {ident}") - - async def get_config(self) -> list[dict[str, Any]]: - """ - Return the list of ``cdg`` files in the configured directory. - - The list is chunked in 1000 files per entry and inside the dictionary - with key ``index``. The filenames are all relative to the configured - ``dir``, so you don't expose parts of your configuration. - - :return: see above - :rtype: list[dict[str, Any]] - """ - - def _get_config() -> list[dict[str, Any]]: - if not self.index: - self.index = [] - print(f"files: indexing {self.dir}") - for path, dir, files in os.walk(self.dir): - for file in files: - if file.endswith(".cdg"): - self.index.append( - os.path.join(path, file)[len(self.dir) :] - ) - print("files: indexing done") - chunked = zip_longest(*[iter(self.index)] * 1000, fillvalue="") - return [ - {"index": list(filter(lambda x: x != "", chunk))} - for chunk in chunked - ] - - return await asyncio.to_thread(_get_config) - - def add_to_config(self, config: dict[str, Any]) -> None: - """Add the chunk of the index list to the internal index list.""" - self.index += config["index"] + return await asyncio.to_thread(_get_file_list) async def get_missing_metadata(self, entry: Entry) -> dict[str, Any]: """ @@ -119,31 +70,10 @@ class FilesSource(Source): We just return the cdg file name and the inferred mp3 file name """ - video_file_name: str = os.path.join(self.dir, entry.ident) audio_file_name: str = os.path.join(self.dir, entry.ident[:-3] + "mp3") return video_file_name, audio_file_name - async def search(self, query: str) -> list[Result]: - """ - Search the internal index list for the query. - - :param query: The query to search for - :type query: str - :return: A list of Results, that need to contain all the words from - the ``query`` - :rtype: list[Result] - """ - print("searching files") - filtered: list[str] = self.filter_data_by_query(query, self.index) - results: list[Result] = [] - for filename in filtered: - result: Optional[Result] = Result.from_filename(filename, "files") - if result is None: - continue - results.append(result) - return results - available_sources["files"] = FilesSource diff --git a/syng/sources/s3.py b/syng/sources/s3.py index 45e6597..176e8b6 100644 --- a/syng/sources/s3.py +++ b/syng/sources/s3.py @@ -5,10 +5,10 @@ Adds it to the ``available_sources`` with the name ``s3`` """ import asyncio import os -from itertools import zip_longest from json import load from json import dump from typing import Any +from typing import cast from typing import Optional from typing import Tuple @@ -16,7 +16,6 @@ import mutagen from minio import Minio from ..entry import Entry -from ..result import Result from .source import available_sources from .source import Source @@ -37,6 +36,7 @@ class S3Source(Source): def __init__(self, config: dict[str, Any]): """Create the source.""" super().__init__(config) + self.source_name = "s3" if ( "endpoint" in config @@ -53,110 +53,41 @@ class S3Source(Source): config["tmp_dir"] if "tmp_dir" in config else "/tmp/syng" ) - self.index: list[str] = config["index"] if "index" in config else [] self.index_file: Optional[str] = ( config["index_file"] if "index_file" in config else None ) self.extra_mpv_arguments = ["--scale=oversample"] - async def get_entry(self, performer: str, ident: str) -> Entry: - """ - Create an :py:class:`syng.entry.Entry` for the identifier. - - The identifier should be a ``cdg`` filepath on the s3 server. - - Initially the duration for the generated entry will be set to 180 - seconds, so the server will ask the client for that missing - metadata. - - :param performer: The persong singing. - :type performer: str - :param ident: A path to a ``cdg`` file. - :type ident: str - :return: An entry with the data. - :rtype: Entry - """ - res: Optional[Result] = Result.from_filename(ident, "s3") - if res is not None: - return Entry( - ident=ident, - source="s3", - duration=180, - album=res.album, - title=res.title, - artist=res.artist, - performer=performer, - ) - raise RuntimeError(f"Could not parse {ident}") - - async def get_config(self) -> dict[str, Any] | list[dict[str, Any]]: + async def get_file_list(self) -> list[str]: """ Return the list of ``cdg`` files on the s3 instance. - The list is chunked in 1000 files per entry and inside the dictionary - with key ``index``. - :return: see above - :rtype: list[dict[str, Any]] + :rtype: list[str] """ - def _get_config() -> dict[str, Any] | list[dict[str, Any]]: - if not self.index: - if self.index_file is not None and os.path.isfile( - self.index_file - ): - with open( - self.index_file, "r", encoding="utf8" - ) as index_file_handle: - self.index = load(index_file_handle) - else: - print(f"s3: Indexing '{self.bucket}'") - self.index = [ - obj.object_name - for obj in self.minio.list_objects( - self.bucket, recursive=True - ) - if obj.object_name.endswith(".cdg") - ] - print("s3: Indexing done") - if self.index_file is not None and not os.path.isfile( - self.index_file - ): - with open( - self.index_file, "w", encoding="utf8" - ) as index_file_handle: - dump(self.index, index_file_handle) + def _get_file_list() -> list[str]: + if self.index_file is not None and os.path.isfile(self.index_file): + with open( + self.index_file, "r", encoding="utf8" + ) as index_file_handle: + return cast(list[str], load(index_file_handle)) - chunked = zip_longest(*[iter(self.index)] * 1000, fillvalue="") - return [ - {"index": list(filter(lambda x: x != "", chunk))} - for chunk in chunked + file_list = [ + obj.object_name + for obj in self.minio.list_objects(self.bucket, recursive=True) + if obj.object_name.endswith(".cdg") ] + if self.index_file is not None and not os.path.isfile( + self.index_file + ): + with open( + self.index_file, "w", encoding="utf8" + ) as index_file_handle: + dump(file_list, index_file_handle) + return file_list - return await asyncio.to_thread(_get_config) - - def add_to_config(self, config: dict[str, Any]) -> None: - """Add the chunk of the index list to the internal index list.""" - self.index += config["index"] - - async def search(self, query: str) -> list[Result]: - """ - Search the internal index list for the query. - - :param query: The query to search for - :type query: str - :return: A list of Results, that need to contain all the words from - the ``query`` - :rtype: list[Result] - """ - filtered: list[str] = self.filter_data_by_query(query, self.index) - results: list[Result] = [] - for filename in filtered: - result: Optional[Result] = Result.from_filename(filename, "s3") - if result is None: - continue - results.append(result) - return results + return await asyncio.to_thread(_get_file_list) async def get_missing_metadata(self, entry: Entry) -> dict[str, Any]: """ diff --git a/syng/sources/source.py b/syng/sources/source.py index ccd2541..a405100 100644 --- a/syng/sources/source.py +++ b/syng/sources/source.py @@ -13,6 +13,7 @@ import shlex from collections import defaultdict from dataclasses import dataclass from dataclasses import field +from itertools import zip_longest from traceback import print_exc from typing import Any from typing import Optional @@ -68,9 +69,9 @@ class Source: """Parentclass for all sources. A new source should subclass this, and at least implement - :py:func:`Source.get_entry`, :py:func:`Source.search` and - :py:func:`Source.do_buffer`. The sources will be shared between the server - and the playback client. + :py:func:`Source.do_buffer`, :py:func:`Song.get_entry` and + :py:func:`Source.get_file_list`, and set the ``source_name`` + attribute. Source specific tasks will be forwarded to the respective source, like: - Playing the audio/video @@ -79,6 +80,16 @@ class Source: - Getting an entry from an identifier - Handling the skipping of currently played song + Some methods of a source will be called by the server and some will be + called by the playback client. + + Specific server methods: + ``get_entry``, ``search``, ``add_to_config`` + + Specific client methods: + ``buffer``, ``do_buffer``, ``play``, ``skip_current``, ``ensure_playable``, + ``get_missing_metadata``, ``get_config`` + Each source has a reference to all files, that are currently queued to download via the :py:attr:`Source.downloaded_files` attribute and a reference to a ``mpv`` process playing songs for that specific source @@ -89,24 +100,27 @@ class Source: started - ``extra_mpv_arguments``, list of arguments added to the mpv instance, can be overwritten by a subclass + - ``source_name``, the string used to identify the source """ - def __init__(self, _: dict[str, Any]): + def __init__(self, config: dict[str, Any]): """ Create and initialize a new source. You should never try to instantiate the Source class directly, rather you should instantiate a subclass. - :param _: Specific configuration for a Soure, ignored in the base - class - :type _: dict[str, Any] + :param config: Specific configuration for a source. See the respective + source for documentation. + :type config: dict[str, Any] """ + self.source_name: str = "" self.downloaded_files: defaultdict[str, DLFilesEntry] = defaultdict( DLFilesEntry ) self._masterlock: asyncio.Lock = asyncio.Lock() self.player: Optional[asyncio.subprocess.Process] = None + self._index: list[str] = config["index"] if "index" in config else [] self.extra_mpv_arguments: list[str] = [] self._skip_next = False @@ -137,33 +151,63 @@ class Source: ) return await mpv_process - async def get_entry(self, performer: str, ident: str) -> Entry: + async def get_entry(self, performer: str, ident: str) -> Optional[Entry]: """ Create an :py:class:`syng.entry.Entry` from a given identifier. - Abstract, needs to be implemented by subclass. + By default, this confirmes, that the ident is a valid entry (i.e. part + of the indexed list), and builds an Entry by parsing the file name. + + Since the server does not have access to the actual file, only to the + file name, ``duration`` can not be set. It will be approximated with + 180 seconds. When added to the queue, the server will ask the client + for additional metadata, like this. :param performer: The performer of the song :type performer: str :param ident: Unique identifier of the song. :type ident: str - :returns: New entry for the identifier. - :rtype: Entry + :returns: New entry for the identifier, or None, if the ident is + invalid. + :rtype: Optional[Entry] """ - raise NotImplementedError + if ident not in self._index: + return None + + res: Optional[Result] = Result.from_filename(ident, self.source_name) + if res is not None: + return Entry( + ident=ident, + source=self.source_name, + duration=180, + album=res.album, + title=res.title, + artist=res.artist, + performer=performer, + ) + return None async def search(self, query: str) -> list[Result]: """ Search the songs from the source for a query. - Abstract, needs to be implemented by subclass. + By default, this searches in the internal index. :param query: The query to search for :type query: str :returns: A list of Results containing the query. :rtype: list[Result] """ - raise NotImplementedError + filtered: list[str] = self.filter_data_by_query(query, self._index) + results: list[Result] = [] + for filename in filtered: + result: Optional[Result] = Result.from_filename( + filename, self.source_name + ) + if result is None: + continue + results.append(result) + return results async def do_buffer(self, entry: Entry) -> Tuple[str, Optional[str]]: """ @@ -331,6 +375,18 @@ class Source: if contains_all_words(splitquery, element) ] + async def get_file_list(self) -> list[str]: + """ + Gather a list of all files belonging to the source. + + This list will be send to the server. When the server searches, this + list will be searched. + + :return: List of filenames belonging to the source + :rtype: list[str] + """ + return [] + async def get_config(self) -> dict[str, Any] | list[dict[str, Any]]: """ Return the part of the config, that should be send to the server. @@ -339,12 +395,26 @@ class Source: dictionary, a single message will be send. If it is a list, one message will be send for each entry in the list. - Abstract, needs to be implemented by subclass. + By default this is the list of files handled by the source, split into + chunks of 1000 filenames. This list is cached internally, so it does + not need to be rebuild, when the client reconnects. + + But this can be any other values, as long as the respective source can + handle that data. :return: The part of the config, that should be sended to the server. :rtype: dict[str, Any] | list[dict[str, Any]] """ - raise NotImplementedError + if not self._index: + self._index = [] + print(f"{self.source_name}: generating index") + self._index = await self.get_file_list() + print(f"{self.source_name}: done") + chunked = zip_longest(*[iter(self._index)] * 1000, fillvalue="") + return [ + {"index": list(filter(lambda x: x != "", chunk))} + for chunk in chunked + ] def add_to_config(self, config: dict[str, Any]) -> None: """ @@ -353,10 +423,14 @@ class Source: This is called on the server, if :py:func:`Source.get_config` returns a list. + In the default configuration, this just adds the index key of the + config to the index attribute of the source + :param config: The part of the config to add. :type config: dict[str, Any] :rtype: None """ + self._index += config["index"] available_sources: dict[str, Type[Source]] = {} diff --git a/syng/sources/youtube.py b/syng/sources/youtube.py index 30766e3..bf93e4d 100644 --- a/syng/sources/youtube.py +++ b/syng/sources/youtube.py @@ -52,6 +52,8 @@ class YoutubeSource(Source): def __init__(self, config: dict[str, Any]): """Create the source.""" super().__init__(config) + self.source_name = "youtube" + self.innertube_client: innertube.InnerTube = innertube.InnerTube( client="WEB" )