From 3c447261b19038ca0729731e6bcc64c78b9589e8 Mon Sep 17 00:00:00 2001 From: Christoph Stahl Date: Mon, 30 Oct 2023 17:36:58 +0100 Subject: [PATCH] Included s3_video in s3. Fixed bug due to newer socketio version. --- syng/server.py | 8 +- syng/sources/__init__.py | 1 - syng/sources/s3.py | 85 +++++++++++++++------ syng/sources/s3_video.py | 155 --------------------------------------- 4 files changed, 66 insertions(+), 183 deletions(-) delete mode 100644 syng/sources/s3_video.py diff --git a/syng/server.py b/syng/server.py index 1550992..8286af2 100644 --- a/syng/server.py +++ b/syng/server.py @@ -591,7 +591,7 @@ async def handle_register_client(sid: str, data: dict[str, Any]) -> None: sources_prio=old_state.config.sources_prio, **data["config"], ) - sio.enter_room(sid, room) + await sio.enter_room(sid, room) await sio.emit( "client-registered", {"success": True, "room": room}, room=sid ) @@ -617,7 +617,7 @@ async def handle_register_client(sid: str, data: dict[str, Any]) -> None: sid=sid, config=Config(sources={}, sources_prio=[], **data["config"]), ) - sio.enter_room(sid, room) + await sio.enter_room(sid, room) await sio.emit( "client-registered", {"success": True, "room": room}, room=sid ) @@ -745,7 +745,7 @@ async def handle_register_web(sid: str, data: dict[str, Any]) -> bool: if data["room"] in clients: async with sio.session(sid) as session: session["room"] = data["room"] - sio.enter_room(sid, session["room"]) + await sio.enter_room(sid, session["room"]) state = clients[session["room"]] await send_state(state, sid) return True @@ -882,7 +882,7 @@ async def handle_disconnect(sid: str) -> None: """ async with sio.session(sid) as session: if "room" in session: - sio.leave_room(sid, session["room"]) + await sio.leave_room(sid, session["room"]) @sio.on("search") diff --git a/syng/sources/__init__.py b/syng/sources/__init__.py index a99f6f4..71f7a05 100644 --- a/syng/sources/__init__.py +++ b/syng/sources/__init__.py @@ -10,7 +10,6 @@ from .source import available_sources as available_sources from .source import Source as Source from .youtube import YoutubeSource from .s3 import S3Source -from .s3_video import S3VideoSource from .files import FilesSource diff --git a/syng/sources/s3.py b/syng/sources/s3.py index bf708b2..a7f6e88 100644 --- a/syng/sources/s3.py +++ b/syng/sources/s3.py @@ -28,9 +28,11 @@ class S3Source(Source): will simply be forwarded to the ``minio`` client. - ``tmp_dir``: The folder, where temporary files are stored. Default is ``/tmp/syng`` - - ``index_file``: If the file does not exist, saves the list of - ``cdg``-files from the s3 instance to this file. If it exists, loads + - ``index_file``: If the file does not exist, saves the paths of + files from the s3 instance to this file. If it exists, loads the list of files from this file. + - ``extensions``: List of filename extensions. Index only files with these one + of these extensions (Default: ["cdg"]) """ def __init__(self, config: dict[str, Any]): @@ -47,14 +49,19 @@ class S3Source(Source): config["endpoint"], access_key=config["access_key"], secret_key=config["secret_key"], - secure=(config["secure"] - if "secure" in config else True), + secure=(config["secure"] if "secure" in config else True), ) self.bucket: str = config["bucket"] self.tmp_dir: str = ( config["tmp_dir"] if "tmp_dir" in config else "/tmp/syng" ) + self.extensions = ( + [f".{ext}" for ext in config["extensions"]] + if "extensions" in config + else [".cdg"] + ) + self.index_file: Optional[str] = ( config["index_file"] if "index_file" in config else None ) @@ -62,7 +69,11 @@ class S3Source(Source): async def get_file_list(self) -> list[str]: """ - Return the list of ``cdg`` files on the s3 instance. + Return the list of files on the s3 instance, according to the extensions. + + If an index file exists, this will be read instead. + + As a side effect, an index file is generated, if configured. :return: see above :rtype: list[str] @@ -78,7 +89,7 @@ class S3Source(Source): file_list = [ obj.object_name for obj in self.minio.list_objects(self.bucket, recursive=True) - if obj.object_name.endswith(".cdg") + if os.path.splitext(obj.object_name)[1] in self.extensions ] if self.index_file is not None and not os.path.isfile( self.index_file @@ -93,7 +104,7 @@ class S3Source(Source): async def get_missing_metadata(self, entry: Entry) -> dict[str, Any]: """ - Return the duration for the mp3 file. + Return the duration for the music file. :param entry: The entry with the associated mp3 file :type entry: Entry @@ -123,40 +134,68 @@ class S3Source(Source): async def do_buffer(self, entry: Entry) -> Tuple[str, Optional[str]]: """ - Download the ``cdg`` and the ``mp3`` file from the s3. + Download the file from the s3. + + If it is a ``cdg`` file, the accompaning ``mp3`` file is also downloaded :param entry: The entry to download :type entry: Entry - :return: A tuple with the location of the ``cdg`` and the ``mp3`` file. + :return: A tuple with the location of the main file. If the file a ``cdg`` file, + the second position is the location of the ``mp3`` file, otherwise None + . :rtype: Tuple[str, Optional[str]] """ - cdg_filename: str = os.path.basename(entry.ident) + + if os.path.splitext(entry.ident)[1] == ".cdg": + cdg_filename: str = os.path.basename(entry.ident) + path_to_file: str = os.path.dirname(entry.ident) + + cdg_path: str = os.path.join(path_to_file, cdg_filename) + target_file_cdg: str = os.path.join(self.tmp_dir, cdg_path) + + ident_mp3: str = entry.ident[:-3] + "mp3" + target_file_mp3: str = target_file_cdg[:-3] + "mp3" + os.makedirs(os.path.dirname(target_file_cdg), exist_ok=True) + + video_task: asyncio.Task[Any] = asyncio.create_task( + asyncio.to_thread( + self.minio.fget_object, + self.bucket, + entry.ident, + target_file_cdg, + ) + ) + audio_task: asyncio.Task[Any] = asyncio.create_task( + asyncio.to_thread( + self.minio.fget_object, + self.bucket, + ident_mp3, + target_file_mp3, + ) + ) + + await video_task + await audio_task + return target_file_cdg, target_file_mp3 + video_filename: str = os.path.basename(entry.ident) path_to_file: str = os.path.dirname(entry.ident) - cdg_path: str = os.path.join(path_to_file, cdg_filename) - target_file_cdg: str = os.path.join(self.tmp_dir, cdg_path) + video_path: str = os.path.join(path_to_file, video_filename) + target_file_video: str = os.path.join(self.tmp_dir, video_path) - ident_mp3: str = entry.ident[:-3] + "mp3" - target_file_mp3: str = target_file_cdg[:-3] + "mp3" - os.makedirs(os.path.dirname(target_file_cdg), exist_ok=True) + os.makedirs(os.path.dirname(target_file_video), exist_ok=True) video_task: asyncio.Task[Any] = asyncio.create_task( asyncio.to_thread( self.minio.fget_object, self.bucket, entry.ident, - target_file_cdg, - ) - ) - audio_task: asyncio.Task[Any] = asyncio.create_task( - asyncio.to_thread( - self.minio.fget_object, self.bucket, ident_mp3, target_file_mp3 + target_file_video, ) ) await video_task - await audio_task - return target_file_cdg, target_file_mp3 + return target_file_video, None available_sources["s3"] = S3Source diff --git a/syng/sources/s3_video.py b/syng/sources/s3_video.py deleted file mode 100644 index a0401a5..0000000 --- a/syng/sources/s3_video.py +++ /dev/null @@ -1,155 +0,0 @@ -""" -Construct the S3Video source. - -Adds it to the ``available_sources`` with the name ``s3-video`` -""" -import asyncio -import os -from json import load -from json import dump -from typing import Any -from typing import cast -from typing import Optional -from typing import Tuple - -import mutagen -from minio import Minio - -from ..entry import Entry -from .source import available_sources -from .source import Source - - -class S3VideoSource(Source): - """A source for playing videos from a s3 compatible storage. - - Config options are: - - ``endpoint``, ``access_key``, ``secret_key``, ``secure``, ``bucket``: These - will simply be forwarded to the ``minio`` client. - - ``tmp_dir``: The folder, where temporary files are stored. Default - is ``/tmp/syng`` - - ``index_file``: If the file does not exist, saves the list of - ``cdg``-files from the s3 instance to this file. If it exists, loads - the list of files from this file. - """ - - def __init__(self, config: dict[str, Any]): - """Create the source.""" - super().__init__(config) - self.source_name = "s3-video" - - if ( - "endpoint" in config - and "access_key" in config - and "secret_key" in config - ): - self.minio: Minio = Minio( - config["endpoint"], - access_key=config["access_key"], - secret_key=config["secret_key"], - secure=(config["secure"] - if "secure" in config else True), - ) - self.bucket: str = config["bucket"] - self.tmp_dir: str = ( - config["tmp_dir"] if "tmp_dir" in config else "/tmp/syng" - ) - - self.index_file: Optional[str] = ( - config["index_file"] if "index_file" in config else None - ) - self.extra_mpv_arguments = ["--scale=oversample"] - - async def get_file_list(self) -> list[str]: - """ - Return the list of ``mp4`` and ``webm`` files on the s3 instance. - - :return: see above - :rtype: list[str] - """ - - def _get_file_list() -> list[str]: - if self.index_file is not None and os.path.isfile(self.index_file): - with open( - self.index_file, "r", encoding="utf8" - ) as index_file_handle: - return cast(list[str], load(index_file_handle)) - - file_list = [ - obj.object_name - for obj in self.minio.list_objects(self.bucket, recursive=True) - if obj.object_name.endswith(".mp4") or obj.object_name.endswith(".webm") - ] - if self.index_file is not None and not os.path.isfile( - self.index_file - ): - with open( - self.index_file, "w", encoding="utf8" - ) as index_file_handle: - dump(file_list, index_file_handle) - - return file_list - - return await asyncio.to_thread(_get_file_list) - - async def get_missing_metadata(self, entry: Entry) -> dict[str, Any]: - """ - Return the duration for the mp3 file. - - :param entry: The entry with the associated mp3 file - :type entry: Entry - :return: A dictionary containing the duration in seconds in the - ``duration`` key. - :rtype: dict[str, Any] - """ - - def mutagen_wrapped(file: str) -> int: - meta_infos = mutagen.File(file).info - return int(meta_infos.length) - - await self.ensure_playable(entry) - - audio_file_name: Optional[str] = self.downloaded_files[ - entry.ident - ].audio - - if audio_file_name is None: - duration: int = 180 - else: - duration = await asyncio.to_thread( - mutagen_wrapped, audio_file_name - ) - - return {"duration": int(duration)} - - async def do_buffer(self, entry: Entry) -> Tuple[str, Optional[str]]: - """ - Download the ``mp4`` or ``webm`` file from the s3. - - :param entry: The entry to download - :type entry: Entry - :return: A tuple with the location of the ``mp4`` or ``webm`` file. - :rtype: Tuple[str, Optional[str]] - """ - video_filename: str = os.path.basename(entry.ident) - path_to_file: str = os.path.dirname(entry.ident) - - video_path: str = os.path.join(path_to_file, video_filename) - target_file_video: str = os.path.join(self.tmp_dir, video_path) - - os.makedirs(os.path.dirname(target_file_video), exist_ok=True) - - video_task: asyncio.Task[Any] = asyncio.create_task( - asyncio.to_thread( - self.minio.fget_object, - self.bucket, - entry.ident, - target_file_video, - ) - ) - - await video_task - return target_file_video, None - - -available_sources["s3-video"] = S3VideoSource