Included s3_video in s3.

Fixed bug due to newer socketio version.
This commit is contained in:
Christoph Stahl 2023-10-30 17:36:58 +01:00
parent a7c941ca90
commit 3c447261b1
4 changed files with 66 additions and 183 deletions

View file

@ -591,7 +591,7 @@ async def handle_register_client(sid: str, data: dict[str, Any]) -> None:
sources_prio=old_state.config.sources_prio, sources_prio=old_state.config.sources_prio,
**data["config"], **data["config"],
) )
sio.enter_room(sid, room) await sio.enter_room(sid, room)
await sio.emit( await sio.emit(
"client-registered", {"success": True, "room": room}, room=sid "client-registered", {"success": True, "room": room}, room=sid
) )
@ -617,7 +617,7 @@ async def handle_register_client(sid: str, data: dict[str, Any]) -> None:
sid=sid, sid=sid,
config=Config(sources={}, sources_prio=[], **data["config"]), config=Config(sources={}, sources_prio=[], **data["config"]),
) )
sio.enter_room(sid, room) await sio.enter_room(sid, room)
await sio.emit( await sio.emit(
"client-registered", {"success": True, "room": room}, room=sid "client-registered", {"success": True, "room": room}, room=sid
) )
@ -745,7 +745,7 @@ async def handle_register_web(sid: str, data: dict[str, Any]) -> bool:
if data["room"] in clients: if data["room"] in clients:
async with sio.session(sid) as session: async with sio.session(sid) as session:
session["room"] = data["room"] session["room"] = data["room"]
sio.enter_room(sid, session["room"]) await sio.enter_room(sid, session["room"])
state = clients[session["room"]] state = clients[session["room"]]
await send_state(state, sid) await send_state(state, sid)
return True return True
@ -882,7 +882,7 @@ async def handle_disconnect(sid: str) -> None:
""" """
async with sio.session(sid) as session: async with sio.session(sid) as session:
if "room" in session: if "room" in session:
sio.leave_room(sid, session["room"]) await sio.leave_room(sid, session["room"])
@sio.on("search") @sio.on("search")

View file

@ -10,7 +10,6 @@ from .source import available_sources as available_sources
from .source import Source as Source from .source import Source as Source
from .youtube import YoutubeSource from .youtube import YoutubeSource
from .s3 import S3Source from .s3 import S3Source
from .s3_video import S3VideoSource
from .files import FilesSource from .files import FilesSource

View file

@ -28,9 +28,11 @@ class S3Source(Source):
will simply be forwarded to the ``minio`` client. will simply be forwarded to the ``minio`` client.
- ``tmp_dir``: The folder, where temporary files are stored. Default - ``tmp_dir``: The folder, where temporary files are stored. Default
is ``/tmp/syng`` is ``/tmp/syng``
- ``index_file``: If the file does not exist, saves the list of - ``index_file``: If the file does not exist, saves the paths of
``cdg``-files from the s3 instance to this file. If it exists, loads files from the s3 instance to this file. If it exists, loads
the list of files from this file. the list of files from this file.
- ``extensions``: List of filename extensions. Index only files with these one
of these extensions (Default: ["cdg"])
""" """
def __init__(self, config: dict[str, Any]): def __init__(self, config: dict[str, Any]):
@ -47,14 +49,19 @@ class S3Source(Source):
config["endpoint"], config["endpoint"],
access_key=config["access_key"], access_key=config["access_key"],
secret_key=config["secret_key"], secret_key=config["secret_key"],
secure=(config["secure"] secure=(config["secure"] if "secure" in config else True),
if "secure" in config else True),
) )
self.bucket: str = config["bucket"] self.bucket: str = config["bucket"]
self.tmp_dir: str = ( self.tmp_dir: str = (
config["tmp_dir"] if "tmp_dir" in config else "/tmp/syng" config["tmp_dir"] if "tmp_dir" in config else "/tmp/syng"
) )
self.extensions = (
[f".{ext}" for ext in config["extensions"]]
if "extensions" in config
else [".cdg"]
)
self.index_file: Optional[str] = ( self.index_file: Optional[str] = (
config["index_file"] if "index_file" in config else None config["index_file"] if "index_file" in config else None
) )
@ -62,7 +69,11 @@ class S3Source(Source):
async def get_file_list(self) -> list[str]: async def get_file_list(self) -> list[str]:
""" """
Return the list of ``cdg`` files on the s3 instance. Return the list of files on the s3 instance, according to the extensions.
If an index file exists, this will be read instead.
As a side effect, an index file is generated, if configured.
:return: see above :return: see above
:rtype: list[str] :rtype: list[str]
@ -78,7 +89,7 @@ class S3Source(Source):
file_list = [ file_list = [
obj.object_name obj.object_name
for obj in self.minio.list_objects(self.bucket, recursive=True) for obj in self.minio.list_objects(self.bucket, recursive=True)
if obj.object_name.endswith(".cdg") if os.path.splitext(obj.object_name)[1] in self.extensions
] ]
if self.index_file is not None and not os.path.isfile( if self.index_file is not None and not os.path.isfile(
self.index_file self.index_file
@ -93,7 +104,7 @@ class S3Source(Source):
async def get_missing_metadata(self, entry: Entry) -> dict[str, Any]: async def get_missing_metadata(self, entry: Entry) -> dict[str, Any]:
""" """
Return the duration for the mp3 file. Return the duration for the music file.
:param entry: The entry with the associated mp3 file :param entry: The entry with the associated mp3 file
:type entry: Entry :type entry: Entry
@ -123,40 +134,68 @@ class S3Source(Source):
async def do_buffer(self, entry: Entry) -> Tuple[str, Optional[str]]: async def do_buffer(self, entry: Entry) -> Tuple[str, Optional[str]]:
""" """
Download the ``cdg`` and the ``mp3`` file from the s3. Download the file from the s3.
If it is a ``cdg`` file, the accompaning ``mp3`` file is also downloaded
:param entry: The entry to download :param entry: The entry to download
:type entry: Entry :type entry: Entry
:return: A tuple with the location of the ``cdg`` and the ``mp3`` file. :return: A tuple with the location of the main file. If the file a ``cdg`` file,
the second position is the location of the ``mp3`` file, otherwise None
.
:rtype: Tuple[str, Optional[str]] :rtype: Tuple[str, Optional[str]]
""" """
cdg_filename: str = os.path.basename(entry.ident)
if os.path.splitext(entry.ident)[1] == ".cdg":
cdg_filename: str = os.path.basename(entry.ident)
path_to_file: str = os.path.dirname(entry.ident)
cdg_path: str = os.path.join(path_to_file, cdg_filename)
target_file_cdg: str = os.path.join(self.tmp_dir, cdg_path)
ident_mp3: str = entry.ident[:-3] + "mp3"
target_file_mp3: str = target_file_cdg[:-3] + "mp3"
os.makedirs(os.path.dirname(target_file_cdg), exist_ok=True)
video_task: asyncio.Task[Any] = asyncio.create_task(
asyncio.to_thread(
self.minio.fget_object,
self.bucket,
entry.ident,
target_file_cdg,
)
)
audio_task: asyncio.Task[Any] = asyncio.create_task(
asyncio.to_thread(
self.minio.fget_object,
self.bucket,
ident_mp3,
target_file_mp3,
)
)
await video_task
await audio_task
return target_file_cdg, target_file_mp3
video_filename: str = os.path.basename(entry.ident)
path_to_file: str = os.path.dirname(entry.ident) path_to_file: str = os.path.dirname(entry.ident)
cdg_path: str = os.path.join(path_to_file, cdg_filename) video_path: str = os.path.join(path_to_file, video_filename)
target_file_cdg: str = os.path.join(self.tmp_dir, cdg_path) target_file_video: str = os.path.join(self.tmp_dir, video_path)
ident_mp3: str = entry.ident[:-3] + "mp3" os.makedirs(os.path.dirname(target_file_video), exist_ok=True)
target_file_mp3: str = target_file_cdg[:-3] + "mp3"
os.makedirs(os.path.dirname(target_file_cdg), exist_ok=True)
video_task: asyncio.Task[Any] = asyncio.create_task( video_task: asyncio.Task[Any] = asyncio.create_task(
asyncio.to_thread( asyncio.to_thread(
self.minio.fget_object, self.minio.fget_object,
self.bucket, self.bucket,
entry.ident, entry.ident,
target_file_cdg, target_file_video,
)
)
audio_task: asyncio.Task[Any] = asyncio.create_task(
asyncio.to_thread(
self.minio.fget_object, self.bucket, ident_mp3, target_file_mp3
) )
) )
await video_task await video_task
await audio_task return target_file_video, None
return target_file_cdg, target_file_mp3
available_sources["s3"] = S3Source available_sources["s3"] = S3Source

View file

@ -1,155 +0,0 @@
"""
Construct the S3Video source.
Adds it to the ``available_sources`` with the name ``s3-video``
"""
import asyncio
import os
from json import load
from json import dump
from typing import Any
from typing import cast
from typing import Optional
from typing import Tuple
import mutagen
from minio import Minio
from ..entry import Entry
from .source import available_sources
from .source import Source
class S3VideoSource(Source):
"""A source for playing videos from a s3 compatible storage.
Config options are:
- ``endpoint``, ``access_key``, ``secret_key``, ``secure``, ``bucket``: These
will simply be forwarded to the ``minio`` client.
- ``tmp_dir``: The folder, where temporary files are stored. Default
is ``/tmp/syng``
- ``index_file``: If the file does not exist, saves the list of
``cdg``-files from the s3 instance to this file. If it exists, loads
the list of files from this file.
"""
def __init__(self, config: dict[str, Any]):
"""Create the source."""
super().__init__(config)
self.source_name = "s3-video"
if (
"endpoint" in config
and "access_key" in config
and "secret_key" in config
):
self.minio: Minio = Minio(
config["endpoint"],
access_key=config["access_key"],
secret_key=config["secret_key"],
secure=(config["secure"]
if "secure" in config else True),
)
self.bucket: str = config["bucket"]
self.tmp_dir: str = (
config["tmp_dir"] if "tmp_dir" in config else "/tmp/syng"
)
self.index_file: Optional[str] = (
config["index_file"] if "index_file" in config else None
)
self.extra_mpv_arguments = ["--scale=oversample"]
async def get_file_list(self) -> list[str]:
"""
Return the list of ``mp4`` and ``webm`` files on the s3 instance.
:return: see above
:rtype: list[str]
"""
def _get_file_list() -> list[str]:
if self.index_file is not None and os.path.isfile(self.index_file):
with open(
self.index_file, "r", encoding="utf8"
) as index_file_handle:
return cast(list[str], load(index_file_handle))
file_list = [
obj.object_name
for obj in self.minio.list_objects(self.bucket, recursive=True)
if obj.object_name.endswith(".mp4") or obj.object_name.endswith(".webm")
]
if self.index_file is not None and not os.path.isfile(
self.index_file
):
with open(
self.index_file, "w", encoding="utf8"
) as index_file_handle:
dump(file_list, index_file_handle)
return file_list
return await asyncio.to_thread(_get_file_list)
async def get_missing_metadata(self, entry: Entry) -> dict[str, Any]:
"""
Return the duration for the mp3 file.
:param entry: The entry with the associated mp3 file
:type entry: Entry
:return: A dictionary containing the duration in seconds in the
``duration`` key.
:rtype: dict[str, Any]
"""
def mutagen_wrapped(file: str) -> int:
meta_infos = mutagen.File(file).info
return int(meta_infos.length)
await self.ensure_playable(entry)
audio_file_name: Optional[str] = self.downloaded_files[
entry.ident
].audio
if audio_file_name is None:
duration: int = 180
else:
duration = await asyncio.to_thread(
mutagen_wrapped, audio_file_name
)
return {"duration": int(duration)}
async def do_buffer(self, entry: Entry) -> Tuple[str, Optional[str]]:
"""
Download the ``mp4`` or ``webm`` file from the s3.
:param entry: The entry to download
:type entry: Entry
:return: A tuple with the location of the ``mp4`` or ``webm`` file.
:rtype: Tuple[str, Optional[str]]
"""
video_filename: str = os.path.basename(entry.ident)
path_to_file: str = os.path.dirname(entry.ident)
video_path: str = os.path.join(path_to_file, video_filename)
target_file_video: str = os.path.join(self.tmp_dir, video_path)
os.makedirs(os.path.dirname(target_file_video), exist_ok=True)
video_task: asyncio.Task[Any] = asyncio.create_task(
asyncio.to_thread(
self.minio.fget_object,
self.bucket,
entry.ident,
target_file_video,
)
)
await video_task
return target_file_video, None
available_sources["s3-video"] = S3VideoSource