Unified files and s3 source

This commit is contained in:
Christoph Stahl 2022-12-11 17:31:39 +01:00
parent c3926b05ef
commit 0dbac0aeee
5 changed files with 134 additions and 194 deletions

View file

@ -197,6 +197,9 @@ async def handle_append(sid: str, data: dict[str, Any]) -> None:
source_obj = state.config.sources[data["source"]]
entry = await source_obj.get_entry(data["performer"], data["ident"])
if entry is None:
await sio.emit("mst", {"msg": f"Unable to append {data['ident']}"})
return
first_song = state.queue.try_peek()
if first_song is None or first_song.started_at is None:

View file

@ -1,7 +1,6 @@
"""Module for the files Source"""
"""Module for the files Source."""
import asyncio
import os
from itertools import zip_longest
from typing import Any
from typing import Optional
from typing import Tuple
@ -17,80 +16,32 @@ from .source import Source
class FilesSource(Source):
"""A source for indexing and playing songs from a local folder.
Config options are:
-``dir``, dirctory to index and server from.
"""
def __init__(self, config: dict[str, Any]):
"""Initialize the file module."""
super().__init__(config)
self.source_name = "files"
self.dir = config["dir"] if "dir" in config else "."
self.index: list[str] = config["index"] if "index" in config else []
self.extra_mpv_arguments = ["--scale=oversample"]
async def get_entry(self, performer: str, ident: str) -> Entry:
"""
Extract the information for an Entry from the file name.
async def get_file_list(self) -> list[str]:
"""Collect all ``cdg`` files in ``dir``."""
Since the server does not have access to the actual file, only to the
file name, ``duration`` can not be set. It will be approximated with
180 seconds. When added to the queue, the server will ask the client
for additional metadata, like this.
def _get_file_list() -> list[str]:
file_list = []
for path, dir, files in os.walk(self.dir):
for file in files:
if file.endswith(".cdg"):
file_list.append(
os.path.join(path, file)[len(self.dir):]
)
return file_list
:param performer: The persong singing.
:type performer: str
:param ident: A path to a ``cdg`` file.
:type ident: str
:return: An entry with the data.
:rtype: Entry
"""
res: Optional[Result] = Result.from_filename(ident, "files")
if res is not None:
return Entry(
ident=ident,
source="files",
duration=180,
album=res.album,
title=res.title,
artist=res.artist,
performer=performer,
)
raise RuntimeError(f"Could not parse {ident}")
async def get_config(self) -> list[dict[str, Any]]:
"""
Return the list of ``cdg`` files in the configured directory.
The list is chunked in 1000 files per entry and inside the dictionary
with key ``index``. The filenames are all relative to the configured
``dir``, so you don't expose parts of your configuration.
:return: see above
:rtype: list[dict[str, Any]]
"""
def _get_config() -> list[dict[str, Any]]:
if not self.index:
self.index = []
print(f"files: indexing {self.dir}")
for path, dir, files in os.walk(self.dir):
for file in files:
if file.endswith(".cdg"):
self.index.append(
os.path.join(path, file)[len(self.dir) :]
)
print("files: indexing done")
chunked = zip_longest(*[iter(self.index)] * 1000, fillvalue="")
return [
{"index": list(filter(lambda x: x != "", chunk))}
for chunk in chunked
]
return await asyncio.to_thread(_get_config)
def add_to_config(self, config: dict[str, Any]) -> None:
"""Add the chunk of the index list to the internal index list."""
self.index += config["index"]
return await asyncio.to_thread(_get_file_list)
async def get_missing_metadata(self, entry: Entry) -> dict[str, Any]:
"""
@ -119,31 +70,10 @@ class FilesSource(Source):
We just return the cdg file name and the inferred mp3 file name
"""
video_file_name: str = os.path.join(self.dir, entry.ident)
audio_file_name: str = os.path.join(self.dir, entry.ident[:-3] + "mp3")
return video_file_name, audio_file_name
async def search(self, query: str) -> list[Result]:
"""
Search the internal index list for the query.
:param query: The query to search for
:type query: str
:return: A list of Results, that need to contain all the words from
the ``query``
:rtype: list[Result]
"""
print("searching files")
filtered: list[str] = self.filter_data_by_query(query, self.index)
results: list[Result] = []
for filename in filtered:
result: Optional[Result] = Result.from_filename(filename, "files")
if result is None:
continue
results.append(result)
return results
available_sources["files"] = FilesSource

View file

@ -5,10 +5,10 @@ Adds it to the ``available_sources`` with the name ``s3``
"""
import asyncio
import os
from itertools import zip_longest
from json import load
from json import dump
from typing import Any
from typing import cast
from typing import Optional
from typing import Tuple
@ -16,7 +16,6 @@ import mutagen
from minio import Minio
from ..entry import Entry
from ..result import Result
from .source import available_sources
from .source import Source
@ -37,6 +36,7 @@ class S3Source(Source):
def __init__(self, config: dict[str, Any]):
"""Create the source."""
super().__init__(config)
self.source_name = "s3"
if (
"endpoint" in config
@ -53,110 +53,41 @@ class S3Source(Source):
config["tmp_dir"] if "tmp_dir" in config else "/tmp/syng"
)
self.index: list[str] = config["index"] if "index" in config else []
self.index_file: Optional[str] = (
config["index_file"] if "index_file" in config else None
)
self.extra_mpv_arguments = ["--scale=oversample"]
async def get_entry(self, performer: str, ident: str) -> Entry:
"""
Create an :py:class:`syng.entry.Entry` for the identifier.
The identifier should be a ``cdg`` filepath on the s3 server.
Initially the duration for the generated entry will be set to 180
seconds, so the server will ask the client for that missing
metadata.
:param performer: The persong singing.
:type performer: str
:param ident: A path to a ``cdg`` file.
:type ident: str
:return: An entry with the data.
:rtype: Entry
"""
res: Optional[Result] = Result.from_filename(ident, "s3")
if res is not None:
return Entry(
ident=ident,
source="s3",
duration=180,
album=res.album,
title=res.title,
artist=res.artist,
performer=performer,
)
raise RuntimeError(f"Could not parse {ident}")
async def get_config(self) -> dict[str, Any] | list[dict[str, Any]]:
async def get_file_list(self) -> list[str]:
"""
Return the list of ``cdg`` files on the s3 instance.
The list is chunked in 1000 files per entry and inside the dictionary
with key ``index``.
:return: see above
:rtype: list[dict[str, Any]]
:rtype: list[str]
"""
def _get_config() -> dict[str, Any] | list[dict[str, Any]]:
if not self.index:
if self.index_file is not None and os.path.isfile(
self.index_file
):
with open(
self.index_file, "r", encoding="utf8"
) as index_file_handle:
self.index = load(index_file_handle)
else:
print(f"s3: Indexing '{self.bucket}'")
self.index = [
obj.object_name
for obj in self.minio.list_objects(
self.bucket, recursive=True
)
if obj.object_name.endswith(".cdg")
]
print("s3: Indexing done")
if self.index_file is not None and not os.path.isfile(
self.index_file
):
with open(
self.index_file, "w", encoding="utf8"
) as index_file_handle:
dump(self.index, index_file_handle)
def _get_file_list() -> list[str]:
if self.index_file is not None and os.path.isfile(self.index_file):
with open(
self.index_file, "r", encoding="utf8"
) as index_file_handle:
return cast(list[str], load(index_file_handle))
chunked = zip_longest(*[iter(self.index)] * 1000, fillvalue="")
return [
{"index": list(filter(lambda x: x != "", chunk))}
for chunk in chunked
file_list = [
obj.object_name
for obj in self.minio.list_objects(self.bucket, recursive=True)
if obj.object_name.endswith(".cdg")
]
if self.index_file is not None and not os.path.isfile(
self.index_file
):
with open(
self.index_file, "w", encoding="utf8"
) as index_file_handle:
dump(file_list, index_file_handle)
return file_list
return await asyncio.to_thread(_get_config)
def add_to_config(self, config: dict[str, Any]) -> None:
"""Add the chunk of the index list to the internal index list."""
self.index += config["index"]
async def search(self, query: str) -> list[Result]:
"""
Search the internal index list for the query.
:param query: The query to search for
:type query: str
:return: A list of Results, that need to contain all the words from
the ``query``
:rtype: list[Result]
"""
filtered: list[str] = self.filter_data_by_query(query, self.index)
results: list[Result] = []
for filename in filtered:
result: Optional[Result] = Result.from_filename(filename, "s3")
if result is None:
continue
results.append(result)
return results
return await asyncio.to_thread(_get_file_list)
async def get_missing_metadata(self, entry: Entry) -> dict[str, Any]:
"""

View file

@ -13,6 +13,7 @@ import shlex
from collections import defaultdict
from dataclasses import dataclass
from dataclasses import field
from itertools import zip_longest
from traceback import print_exc
from typing import Any
from typing import Optional
@ -68,9 +69,9 @@ class Source:
"""Parentclass for all sources.
A new source should subclass this, and at least implement
:py:func:`Source.get_entry`, :py:func:`Source.search` and
:py:func:`Source.do_buffer`. The sources will be shared between the server
and the playback client.
:py:func:`Source.do_buffer`, :py:func:`Song.get_entry` and
:py:func:`Source.get_file_list`, and set the ``source_name``
attribute.
Source specific tasks will be forwarded to the respective source, like:
- Playing the audio/video
@ -79,6 +80,16 @@ class Source:
- Getting an entry from an identifier
- Handling the skipping of currently played song
Some methods of a source will be called by the server and some will be
called by the playback client.
Specific server methods:
``get_entry``, ``search``, ``add_to_config``
Specific client methods:
``buffer``, ``do_buffer``, ``play``, ``skip_current``, ``ensure_playable``,
``get_missing_metadata``, ``get_config``
Each source has a reference to all files, that are currently queued to
download via the :py:attr:`Source.downloaded_files` attribute and a
reference to a ``mpv`` process playing songs for that specific source
@ -89,24 +100,27 @@ class Source:
started
- ``extra_mpv_arguments``, list of arguments added to the mpv
instance, can be overwritten by a subclass
- ``source_name``, the string used to identify the source
"""
def __init__(self, _: dict[str, Any]):
def __init__(self, config: dict[str, Any]):
"""
Create and initialize a new source.
You should never try to instantiate the Source class directly, rather
you should instantiate a subclass.
:param _: Specific configuration for a Soure, ignored in the base
class
:type _: dict[str, Any]
:param config: Specific configuration for a source. See the respective
source for documentation.
:type config: dict[str, Any]
"""
self.source_name: str = ""
self.downloaded_files: defaultdict[str, DLFilesEntry] = defaultdict(
DLFilesEntry
)
self._masterlock: asyncio.Lock = asyncio.Lock()
self.player: Optional[asyncio.subprocess.Process] = None
self._index: list[str] = config["index"] if "index" in config else []
self.extra_mpv_arguments: list[str] = []
self._skip_next = False
@ -137,33 +151,63 @@ class Source:
)
return await mpv_process
async def get_entry(self, performer: str, ident: str) -> Entry:
async def get_entry(self, performer: str, ident: str) -> Optional[Entry]:
"""
Create an :py:class:`syng.entry.Entry` from a given identifier.
Abstract, needs to be implemented by subclass.
By default, this confirmes, that the ident is a valid entry (i.e. part
of the indexed list), and builds an Entry by parsing the file name.
Since the server does not have access to the actual file, only to the
file name, ``duration`` can not be set. It will be approximated with
180 seconds. When added to the queue, the server will ask the client
for additional metadata, like this.
:param performer: The performer of the song
:type performer: str
:param ident: Unique identifier of the song.
:type ident: str
:returns: New entry for the identifier.
:rtype: Entry
:returns: New entry for the identifier, or None, if the ident is
invalid.
:rtype: Optional[Entry]
"""
raise NotImplementedError
if ident not in self._index:
return None
res: Optional[Result] = Result.from_filename(ident, self.source_name)
if res is not None:
return Entry(
ident=ident,
source=self.source_name,
duration=180,
album=res.album,
title=res.title,
artist=res.artist,
performer=performer,
)
return None
async def search(self, query: str) -> list[Result]:
"""
Search the songs from the source for a query.
Abstract, needs to be implemented by subclass.
By default, this searches in the internal index.
:param query: The query to search for
:type query: str
:returns: A list of Results containing the query.
:rtype: list[Result]
"""
raise NotImplementedError
filtered: list[str] = self.filter_data_by_query(query, self._index)
results: list[Result] = []
for filename in filtered:
result: Optional[Result] = Result.from_filename(
filename, self.source_name
)
if result is None:
continue
results.append(result)
return results
async def do_buffer(self, entry: Entry) -> Tuple[str, Optional[str]]:
"""
@ -331,6 +375,18 @@ class Source:
if contains_all_words(splitquery, element)
]
async def get_file_list(self) -> list[str]:
"""
Gather a list of all files belonging to the source.
This list will be send to the server. When the server searches, this
list will be searched.
:return: List of filenames belonging to the source
:rtype: list[str]
"""
return []
async def get_config(self) -> dict[str, Any] | list[dict[str, Any]]:
"""
Return the part of the config, that should be send to the server.
@ -339,12 +395,26 @@ class Source:
dictionary, a single message will be send. If it is a list, one message
will be send for each entry in the list.
Abstract, needs to be implemented by subclass.
By default this is the list of files handled by the source, split into
chunks of 1000 filenames. This list is cached internally, so it does
not need to be rebuild, when the client reconnects.
But this can be any other values, as long as the respective source can
handle that data.
:return: The part of the config, that should be sended to the server.
:rtype: dict[str, Any] | list[dict[str, Any]]
"""
raise NotImplementedError
if not self._index:
self._index = []
print(f"{self.source_name}: generating index")
self._index = await self.get_file_list()
print(f"{self.source_name}: done")
chunked = zip_longest(*[iter(self._index)] * 1000, fillvalue="")
return [
{"index": list(filter(lambda x: x != "", chunk))}
for chunk in chunked
]
def add_to_config(self, config: dict[str, Any]) -> None:
"""
@ -353,10 +423,14 @@ class Source:
This is called on the server, if :py:func:`Source.get_config` returns a
list.
In the default configuration, this just adds the index key of the
config to the index attribute of the source
:param config: The part of the config to add.
:type config: dict[str, Any]
:rtype: None
"""
self._index += config["index"]
available_sources: dict[str, Type[Source]] = {}

View file

@ -52,6 +52,8 @@ class YoutubeSource(Source):
def __init__(self, config: dict[str, Any]):
"""Create the source."""
super().__init__(config)
self.source_name = "youtube"
self.innertube_client: innertube.InnerTube = innertube.InnerTube(
client="WEB"
)