syng/syng/sources/s3.py

157 lines
5.4 KiB
Python

"""
Construct the S3 source.
Adds it to the ``available_sources`` with the name ``s3``
"""
import asyncio
import os
from json import load
from json import dump
from typing import Any
from typing import cast
from typing import Optional
from typing import Tuple
from minio import Minio
from .filebased import FileBasedSource
from ..entry import Entry
from .source import available_sources
class S3Source(FileBasedSource):
"""A source for playing songs from a s3 compatible storage.
Config options are:
- ``endpoint``, ``access_key``, ``secret_key``, ``secure``, ``bucket``: These
will simply be forwarded to the ``minio`` client.
- ``tmp_dir``: The folder, where temporary files are stored. Default
is ``/tmp/syng``
- ``index_file``: If the file does not exist, saves the paths of
files from the s3 instance to this file. If it exists, loads
the list of files from this file.
"""
source_name = "s3"
config_schema = FileBasedSource.config_schema | {
"endpoint": (str, "Endpoint of the s3", ""),
"access_key": (str, "Access Key of the s3", ""),
"secret_key": (str, "Secret Key of the s3", ""),
"secure": (bool, "Use SSL", True),
"bucket": (str, "Bucket of the s3", ""),
"tmp_dir": (str, "Folder for\ntemporary download", "/tmp/syng"),
"index_file": (str, "Index file", "s3-index"),
}
def __init__(self, config: dict[str, Any]):
"""Create the source."""
super().__init__(config)
if "endpoint" in config and "access_key" in config and "secret_key" in config:
self.minio: Minio = Minio(
config["endpoint"],
access_key=config["access_key"],
secret_key=config["secret_key"],
secure=(config["secure"] if "secure" in config else True),
)
self.bucket: str = config["bucket"]
self.tmp_dir: str = (
config["tmp_dir"] if "tmp_dir" in config else "/tmp/syng"
)
self.index_file: Optional[str] = (
config["index_file"] if "index_file" in config else None
)
self.extra_mpv_arguments = ["--scale=oversample"]
async def get_file_list(self) -> list[str]:
"""
Return the list of files on the s3 instance, according to the extensions.
If an index file exists, this will be read instead.
As a side effect, an index file is generated, if configured.
:return: see above
:rtype: list[str]
"""
def _get_file_list() -> list[str]:
if self.index_file is not None and os.path.isfile(self.index_file):
with open(self.index_file, "r", encoding="utf8") as index_file_handle:
return cast(list[str], load(index_file_handle))
file_list = [
obj.object_name
for obj in self.minio.list_objects(self.bucket, recursive=True)
if self.has_correct_extension(obj.object_name)
]
if self.index_file is not None and not os.path.isfile(self.index_file):
with open(self.index_file, "w", encoding="utf8") as index_file_handle:
dump(file_list, index_file_handle)
return file_list
return await asyncio.to_thread(_get_file_list)
async def get_missing_metadata(self, entry: Entry) -> dict[str, Any]:
"""
Return the duration for the music file.
:param entry: The entry with the associated mp3 file
:type entry: Entry
:return: A dictionary containing the duration in seconds in the
``duration`` key.
:rtype: dict[str, Any]
"""
await self.ensure_playable(entry)
file_name: str = self.downloaded_files[entry.ident].video
duration = await self.get_duration(file_name)
return {"duration": duration}
async def do_buffer(self, entry: Entry) -> Tuple[str, Optional[str]]:
"""
Download the file from the s3.
If it is a ``cdg`` file, the accompaning ``mp3`` file is also downloaded
:param entry: The entry to download
:type entry: Entry
:return: A tuple with the location of the main file. If the file a ``cdg`` file,
the second position is the location of the ``mp3`` file, otherwise None
.
:rtype: Tuple[str, Optional[str]]
"""
video_path, audio_path = self.get_video_audio_split(entry.ident)
video_dl_path: str = os.path.join(self.tmp_dir, video_path)
os.makedirs(os.path.dirname(video_dl_path), exist_ok=True)
video_dl_task: asyncio.Task[Any] = asyncio.create_task(
asyncio.to_thread(
self.minio.fget_object, self.bucket, entry.ident, video_dl_path
)
)
if audio_path is not None:
audio_dl_path: Optional[str] = os.path.join(self.tmp_dir, audio_path)
audio_dl_task: asyncio.Task[Any] = asyncio.create_task(
asyncio.to_thread(
self.minio.fget_object, self.bucket, audio_path, audio_dl_path
)
)
else:
audio_dl_path = None
audio_dl_task = asyncio.create_task(asyncio.sleep(0))
await video_dl_task
await audio_dl_task
return video_dl_path, audio_dl_path
available_sources["s3"] = S3Source