From 3fb43de576655591b5c1cdefce34b8c497a23f04 Mon Sep 17 00:00:00 2001 From: Christoph Stahl Date: Mon, 18 Nov 2024 22:04:31 +0100 Subject: [PATCH] Better handling of buffering and start_streaming --- syng/client.py | 29 +++++++++++----------- syng/gui.py | 10 +++++--- syng/player_libmpv.py | 44 +++++++++++++++++++++++---------- syng/sources/filebased.py | 1 - syng/sources/files.py | 3 +-- syng/sources/s3.py | 3 +-- syng/sources/source.py | 51 ++++++++------------------------------- syng/sources/youtube.py | 32 ++++++++++++++---------- 8 files changed, 84 insertions(+), 89 deletions(-) diff --git a/syng/client.py b/syng/client.py index 6320426..7f66cc3 100644 --- a/syng/client.py +++ b/syng/client.py @@ -59,7 +59,7 @@ def default_config() -> dict[str, Optional[int | str]]: "last_song": None, "waiting_room_policy": None, "key": None, - "mpv_options": "", + "buffer_in_advance": 2, "show_advanced": False, } @@ -100,6 +100,8 @@ class State: - `optional`, if a performer is already in the queue, they have the option to be put in the waiting room. - `None`, performers are always added to the queue. + * `buffer_in_advance` (`int`): The number of songs, that are buffered in + advance. :type config: dict[str, Any]: """ @@ -124,6 +126,7 @@ class Client: self.sources = configure_sources(config["sources"]) self.state = State() self.currentLock = asyncio.Semaphore(0) + self.buffer_in_advance = config["config"]["buffer_in_advance"] self.player = Player( f"{config['config']['server']}/{config['config']['room']}", self.quit_callback ) @@ -197,9 +200,9 @@ class Client: self.state.waiting_room = [Entry(**entry) for entry in data["waiting_room"]] self.state.recent = [Entry(**entry) for entry in data["recent"]] - for entry in self.state.queue[:2]: + for pos, entry in enumerate(self.state.queue[0 : self.buffer_in_advance]): logger.info("Buffering: %s", entry.title) - await self.sources[entry.source].buffer(entry) + await self.sources[entry.source].buffer(entry, pos) async def handle_connect(self) -> None: """ @@ -351,6 +354,8 @@ class Client: :rtype: None """ if data["success"]: + self.player.start() + logger.info("Registered") qr_string = f"{self.state.config['server']}/{data['room']}" self.player.update_qr(qr_string) @@ -434,11 +439,10 @@ class Client: :rtype: None """ engineio.async_client.async_signal_handler() - if self.player is not None: + if self.player.mpv is not None: self.player.mpv.terminate() def quit_callback(self) -> None: - self.player.close() if self.loop is not None: asyncio.run_coroutine_threadsafe(self.sio.disconnect(), self.loop) @@ -474,23 +478,20 @@ class Client: try: await self.sio.connect(self.state.config["server"]) - except ConnectionError: - logger.error("Could not connect to server") - return - # this is not supported under windows - if os.name != "nt": - asyncio.get_event_loop().add_signal_handler(signal.SIGINT, self.signal_handler) - asyncio.get_event_loop().add_signal_handler(signal.SIGTERM, self.signal_handler) + # this is not supported under windows + if os.name != "nt": + asyncio.get_event_loop().add_signal_handler(signal.SIGINT, self.signal_handler) - try: self.is_running = True await self.sio.wait() except asyncio.CancelledError: pass + except ConnectionError: + logger.error("Could not connect to server") finally: self.is_running = False - if self.player is not None: + if self.player.mpv is not None: self.player.mpv.terminate() diff --git a/syng/gui.py b/syng/gui.py index 64ede42..f1329c3 100644 --- a/syng/gui.py +++ b/syng/gui.py @@ -441,7 +441,11 @@ class GeneralConfig(OptionFrame): self.add_string_option( "key", "Key for server (if necessary)", config["key"], is_password=True ) - self.add_string_option("mpv_options", "Additional MPV Arguments", config["mpv_options"]) + self.add_int_option( + "buffer_in_advance", + "Buffer the next songs in advance", + int(config["buffer_in_advance"]), + ) if not config["show_advanced"]: for option in [ @@ -449,7 +453,7 @@ class GeneralConfig(OptionFrame): "last_song", "preview_duration", "key", - "mpv_options", + "buffer_in_advance", ]: self.rows[option][0].setVisible(False) widget_or_layout = self.rows[option][1] @@ -519,7 +523,7 @@ class SyngGui(QMainWindow): "last_song", "preview_duration", "key", - "mpv_options", + "buffer_in_advance", ]: self.general_config.rows[option][0].setVisible(state) widget_or_layout = self.general_config.rows[option][1] diff --git a/syng/player_libmpv.py b/syng/player_libmpv.py index 87838d0..193ddd6 100644 --- a/syng/player_libmpv.py +++ b/syng/player_libmpv.py @@ -8,6 +8,7 @@ import os from .entry import Entry + class Player: def __init__(self, qr_string: str, quit_callback: Callable[[], None]) -> None: locale.setlocale(locale.LC_ALL, "C") @@ -16,23 +17,24 @@ class Player: if getattr(sys, "frozen", False) and hasattr(sys, "_MEIPASS"): self.base_dir = getattr(sys, "_MEIPASS") self.closing = False - self.mpv = mpv.MPV(ytdl=True, input_default_bindings=True, input_vo_keyboard=True, osc=True) - self.mpv.title = "Syng - Player" - self.mpv.keep_open = "yes" + self.mpv: Optional[mpv.MPV] = None self.qr_overlay: Optional[mpv.ImageOverlay] = None self.update_qr( qr_string, ) - self.mpv.play( - f"{self.base_dir}/background.png", - ) - self.default_options = { "scale": "bilinear", } self.quit_callback = quit_callback + def start(self) -> None: + self.mpv = mpv.MPV(ytdl=True, input_default_bindings=True, input_vo_keyboard=True, osc=True) + self.mpv.title = "Syng - Player" + self.mpv.keep_open = "yes" + self.mpv.play( + f"{self.base_dir}/background.png", + ) self.mpv.observe_property("osd-width", self.osd_size_handler) self.mpv.observe_property("osd-height", self.osd_size_handler) self.mpv.register_event_callback(self.event_callback) @@ -40,12 +42,9 @@ class Player: def event_callback(self, event: mpv.MpvEvent) -> None: e = event.as_dict() if e["event"] == b"shutdown": - self.closing = True - self.quit_callback() - - def close(self): - if not self.closing: - self.mpv.terminate() + if not self.closing: + self.closing = True + self.quit_callback() def update_qr(self, qr_string: str) -> None: qr = QRCode(box_size=5, border=1) @@ -54,6 +53,9 @@ class Player: self.qr = qr.make_image().convert("RGBA") def osd_size_handler(self, attribute: str, value: int) -> None: + if self.mpv is None: + print("MPV is not initialized", file=sys.stderr) + return if self.qr_overlay: self.mpv.remove_overlay(self.qr_overlay.overlay_id) @@ -66,6 +68,10 @@ class Player: self.qr_overlay = self.mpv.create_image_overlay(self.qr, pos=(x_pos, y_pos)) async def queue_next(self, entry: Entry) -> None: + if self.mpv is None: + print("MPV is not initialized", file=sys.stderr) + return + loop = asyncio.get_running_loop() frame = sys._getframe() @@ -91,6 +97,10 @@ class Player: self.quit_callback() def play_image(self, image: str, duration: int, sub_file: Optional[str] = None) -> None: + if self.mpv is None: + print("MPV is not initialized", file=sys.stderr) + return + for property, value in self.default_options.items(): self.mpv[property] = value self.mpv.image_display_duration = duration @@ -107,6 +117,10 @@ class Player: audio: Optional[str] = None, override_options: Optional[dict[str, str]] = None, ) -> None: + if self.mpv is None: + print("MPV is not initialized", file=sys.stderr) + return + if override_options is None: override_options = {} for property, value in self.default_options.items(): @@ -130,6 +144,10 @@ class Player: self.quit_callback() def skip_current(self) -> None: + if self.mpv is None: + print("MPV is not initialized", file=sys.stderr) + return + self.mpv.image_display_duration = 0 self.mpv.play( f"{self.base_dir}/background.png", diff --git a/syng/sources/filebased.py b/syng/sources/filebased.py index b31ef22..efb51d5 100644 --- a/syng/sources/filebased.py +++ b/syng/sources/filebased.py @@ -39,7 +39,6 @@ class FileBasedSource(Source): super().__init__(config) self.extensions: list[str] = config["extensions"] if "extensions" in config else ["mp3+cdg"] - self.extra_mpv_arguments = ["--scale=oversample"] self.extra_mpv_options = {"scale": "oversample"} def has_correct_extension(self, path: Optional[str]) -> bool: diff --git a/syng/sources/files.py b/syng/sources/files.py index dbfd3ed..c0cfe39 100644 --- a/syng/sources/files.py +++ b/syng/sources/files.py @@ -30,7 +30,6 @@ class FilesSource(FileBasedSource): super().__init__(config) self.dir = config["dir"] if "dir" in config else "." - self.extra_mpv_arguments = ["--scale=oversample"] async def get_file_list(self) -> list[str]: """Collect all files in ``dir``, that have the correct filename extension""" @@ -60,7 +59,7 @@ class FilesSource(FileBasedSource): return {"duration": duration} - async def do_buffer(self, entry: Entry) -> Tuple[str, Optional[str]]: + async def do_buffer(self, entry: Entry, pos: int) -> Tuple[str, Optional[str]]: """ No buffering needs to be done, since the files are already on disk. diff --git a/syng/sources/s3.py b/syng/sources/s3.py index b3be9bd..4d29aff 100644 --- a/syng/sources/s3.py +++ b/syng/sources/s3.py @@ -77,7 +77,6 @@ class S3Source(FileBasedSource): self.tmp_dir: str = config["tmp_dir"] if "tmp_dir" in config else "/tmp/syng" self.index_file: Optional[str] = config["index_file"] if "index_file" in config else None - self.extra_mpv_arguments = ["--scale=oversample"] def load_file_list_from_server(self) -> list[str]: """ @@ -164,7 +163,7 @@ class S3Source(FileBasedSource): return {"duration": duration} - async def do_buffer(self, entry: Entry) -> Tuple[str, Optional[str]]: + async def do_buffer(self, entry: Entry, pos: int) -> Tuple[str, Optional[str]]: """ Download the file from the s3. diff --git a/syng/sources/source.py b/syng/sources/source.py index fe73db0..a2b3202 100644 --- a/syng/sources/source.py +++ b/syng/sources/source.py @@ -80,7 +80,6 @@ class Source(ABC): attribute. Source specific tasks will be forwarded to the respective source, like: - - Playing the audio/video - Buffering the audio/video - Searching for a query - Getting an entry from an identifier @@ -93,7 +92,7 @@ class Source(ABC): ``get_entry``, ``search``, ``add_to_config`` Specific client methods: - ``buffer``, ``do_buffer``, ``play``, ``skip_current``, ``ensure_playable``, + ``buffer``, ``do_buffer``, ``skip_current``, ``ensure_playable``, ``get_missing_metadata``, ``get_config`` Each source has a reference to all files, that are currently queued to @@ -104,7 +103,7 @@ class Source(ABC): :py:attr:`Entry.ident` to :py:class:`DLFilesEntry`. - ``player``, the reference to the ``mpv`` process, if it has started - - ``extra_mpv_arguments``, list of arguments added to the mpv + - ``extra_mpv_options``, dictionary of arguments added to the mpv instance, can be overwritten by a subclass - ``source_name``, the string used to identify the source """ @@ -128,7 +127,6 @@ class Source(ABC): self.downloaded_files: defaultdict[str, DLFilesEntry] = defaultdict(DLFilesEntry) self._masterlock: asyncio.Lock = asyncio.Lock() self._index: list[str] = config["index"] if "index" in config else [] - self.extra_mpv_arguments: list[str] = [] self.extra_mpv_options: dict[str, str] = {} self._skip_next = False @@ -192,7 +190,7 @@ class Source(ABC): return results @abstractmethod - async def do_buffer(self, entry: Entry) -> Tuple[str, Optional[str]]: + async def do_buffer(self, entry: Entry, pos: int) -> Tuple[str, Optional[str]]: """ Source specific part of buffering. @@ -205,11 +203,13 @@ class Source(ABC): :param entry: The entry to buffer :type entry: Entry + :param pos: The position in the queue, the entry is at. + :type pos: int :returns: A Tuple of the locations for the video and the audio file. :rtype: Tuple[str, Optional[str]] """ - async def buffer(self, entry: Entry) -> None: + async def buffer(self, entry: Entry, pos: int) -> None: """ Buffer all necessary files for the entry. @@ -223,6 +223,8 @@ class Source(ABC): :param entry: The entry to buffer :type entry: Entry + :param pos: The position in the queue, the entry is at. + :type pos: int :rtype: None """ async with self._masterlock: @@ -231,7 +233,7 @@ class Source(ABC): self.downloaded_files[entry.ident].buffering = True try: - buffer_task = asyncio.create_task(self.do_buffer(entry)) + buffer_task = asyncio.create_task(self.do_buffer(entry, pos)) self.downloaded_files[entry.ident].buffer_task = buffer_task video, audio = await buffer_task @@ -245,39 +247,6 @@ class Source(ABC): self.downloaded_files[entry.ident].ready.set() - async def play(self, entry: Entry, player: Player, mpv_options: str) -> None: - """ - Play the entry. - - This waits until buffering is complete and starts - playing the entry. - - :param entry: The entry to play - :type entry: Entry - :param mpv_options: Extra options for the mpv player - :type mpv_options: str - :rtype: None - """ - await self.ensure_playable(entry) - - if self.downloaded_files[entry.ident].failed: - del self.downloaded_files[entry.ident] - return - - async with self._masterlock: - if self._skip_next: - self._skip_next = False - entry.skip = True - return - - await player.play( - self.downloaded_files[entry.ident].video, self.downloaded_files[entry.ident].audio - ) - - if self._skip_next: - self._skip_next = False - entry.skip = True - async def skip_current(self, entry: Entry) -> None: """ Skips first song in the queue. @@ -308,7 +277,7 @@ class Source(ABC): :type entry: Entry :rtype: None """ - await self.buffer(entry) + await self.buffer(entry, 0) dlfilesentry = self.downloaded_files[entry.ident] await dlfilesentry.ready.wait() return dlfilesentry.video, dlfilesentry.audio diff --git a/syng/sources/youtube.py b/syng/sources/youtube.py index 2159572..7287b9a 100644 --- a/syng/sources/youtube.py +++ b/syng/sources/youtube.py @@ -233,25 +233,22 @@ class YoutubeSource(Source): """ return {"channels": self.channels} - async def play(self, entry: Entry, player: Player, mpv_options: str) -> None: + async def ensure_playable(self, entry: Entry) -> tuple[str, Optional[str]]: """ - Play the given entry. + Ensure that the entry is playable. - If ``start_streaming`` is set and buffering is not yet done, starts - immediatly and forwards the url to ``mpv``. + If the entry is not yet downloaded, download it. + If start_streaming is set, start streaming immediatly. - Otherwise wait for buffering and start playing. - - :param entry: The entry to play. + :param entry: The entry to download. :type entry: Entry - :param mpv_options: The options to pass to ``mpv``. - :type mpv_options: str :rtype: None """ + if self.start_streaming and not self.downloaded_files[entry.ident].complete: - await player.play(entry.ident) - else: - await super().play(entry, player, mpv_options) + return (entry.ident, None) + + return await super().ensure_playable(entry) async def get_entry( self, @@ -376,7 +373,7 @@ class YoutubeSource(Source): } return {} - async def do_buffer(self, entry: Entry) -> Tuple[str, Optional[str]]: + async def do_buffer(self, entry: Entry, pos: int) -> Tuple[str, Optional[str]]: """ Download the video. @@ -387,11 +384,20 @@ class YoutubeSource(Source): location exists, the return value for the audio part will always be ``None``. + If pos is 0 and start_streaming is set, no buffering is done, instead the + youtube url is returned. + :param entry: The entry to download. :type entry: Entry + :param pos: The position in the video to start buffering. + :type pos: int :return: The location of the video file and ``None``. :rtype: Tuple[str, Optional[str]] """ + + if pos == 0 and self.start_streaming: + return entry.ident, None + info: Any = await asyncio.to_thread(self._yt_dlp.extract_info, entry.ident) combined_path = info["requested_downloads"][0]["filepath"] return combined_path, None