Better handling of buffering and start_streaming

This commit is contained in:
Christoph Stahl 2024-11-18 22:04:31 +01:00
parent 220f1e8779
commit 3fb43de576
8 changed files with 84 additions and 89 deletions

View file

@ -59,7 +59,7 @@ def default_config() -> dict[str, Optional[int | str]]:
"last_song": None,
"waiting_room_policy": None,
"key": None,
"mpv_options": "",
"buffer_in_advance": 2,
"show_advanced": False,
}
@ -100,6 +100,8 @@ class State:
- `optional`, if a performer is already in the queue, they have the option
to be put in the waiting room.
- `None`, performers are always added to the queue.
* `buffer_in_advance` (`int`): The number of songs, that are buffered in
advance.
:type config: dict[str, Any]:
"""
@ -124,6 +126,7 @@ class Client:
self.sources = configure_sources(config["sources"])
self.state = State()
self.currentLock = asyncio.Semaphore(0)
self.buffer_in_advance = config["config"]["buffer_in_advance"]
self.player = Player(
f"{config['config']['server']}/{config['config']['room']}", self.quit_callback
)
@ -197,9 +200,9 @@ class Client:
self.state.waiting_room = [Entry(**entry) for entry in data["waiting_room"]]
self.state.recent = [Entry(**entry) for entry in data["recent"]]
for entry in self.state.queue[:2]:
for pos, entry in enumerate(self.state.queue[0 : self.buffer_in_advance]):
logger.info("Buffering: %s", entry.title)
await self.sources[entry.source].buffer(entry)
await self.sources[entry.source].buffer(entry, pos)
async def handle_connect(self) -> None:
"""
@ -351,6 +354,8 @@ class Client:
:rtype: None
"""
if data["success"]:
self.player.start()
logger.info("Registered")
qr_string = f"{self.state.config['server']}/{data['room']}"
self.player.update_qr(qr_string)
@ -434,11 +439,10 @@ class Client:
:rtype: None
"""
engineio.async_client.async_signal_handler()
if self.player is not None:
if self.player.mpv is not None:
self.player.mpv.terminate()
def quit_callback(self) -> None:
self.player.close()
if self.loop is not None:
asyncio.run_coroutine_threadsafe(self.sio.disconnect(), self.loop)
@ -474,23 +478,20 @@ class Client:
try:
await self.sio.connect(self.state.config["server"])
except ConnectionError:
logger.error("Could not connect to server")
return
# this is not supported under windows
if os.name != "nt":
asyncio.get_event_loop().add_signal_handler(signal.SIGINT, self.signal_handler)
asyncio.get_event_loop().add_signal_handler(signal.SIGTERM, self.signal_handler)
# this is not supported under windows
if os.name != "nt":
asyncio.get_event_loop().add_signal_handler(signal.SIGINT, self.signal_handler)
try:
self.is_running = True
await self.sio.wait()
except asyncio.CancelledError:
pass
except ConnectionError:
logger.error("Could not connect to server")
finally:
self.is_running = False
if self.player is not None:
if self.player.mpv is not None:
self.player.mpv.terminate()

View file

@ -441,7 +441,11 @@ class GeneralConfig(OptionFrame):
self.add_string_option(
"key", "Key for server (if necessary)", config["key"], is_password=True
)
self.add_string_option("mpv_options", "Additional MPV Arguments", config["mpv_options"])
self.add_int_option(
"buffer_in_advance",
"Buffer the next songs in advance",
int(config["buffer_in_advance"]),
)
if not config["show_advanced"]:
for option in [
@ -449,7 +453,7 @@ class GeneralConfig(OptionFrame):
"last_song",
"preview_duration",
"key",
"mpv_options",
"buffer_in_advance",
]:
self.rows[option][0].setVisible(False)
widget_or_layout = self.rows[option][1]
@ -519,7 +523,7 @@ class SyngGui(QMainWindow):
"last_song",
"preview_duration",
"key",
"mpv_options",
"buffer_in_advance",
]:
self.general_config.rows[option][0].setVisible(state)
widget_or_layout = self.general_config.rows[option][1]

View file

@ -8,6 +8,7 @@ import os
from .entry import Entry
class Player:
def __init__(self, qr_string: str, quit_callback: Callable[[], None]) -> None:
locale.setlocale(locale.LC_ALL, "C")
@ -16,23 +17,24 @@ class Player:
if getattr(sys, "frozen", False) and hasattr(sys, "_MEIPASS"):
self.base_dir = getattr(sys, "_MEIPASS")
self.closing = False
self.mpv = mpv.MPV(ytdl=True, input_default_bindings=True, input_vo_keyboard=True, osc=True)
self.mpv.title = "Syng - Player"
self.mpv.keep_open = "yes"
self.mpv: Optional[mpv.MPV] = None
self.qr_overlay: Optional[mpv.ImageOverlay] = None
self.update_qr(
qr_string,
)
self.mpv.play(
f"{self.base_dir}/background.png",
)
self.default_options = {
"scale": "bilinear",
}
self.quit_callback = quit_callback
def start(self) -> None:
self.mpv = mpv.MPV(ytdl=True, input_default_bindings=True, input_vo_keyboard=True, osc=True)
self.mpv.title = "Syng - Player"
self.mpv.keep_open = "yes"
self.mpv.play(
f"{self.base_dir}/background.png",
)
self.mpv.observe_property("osd-width", self.osd_size_handler)
self.mpv.observe_property("osd-height", self.osd_size_handler)
self.mpv.register_event_callback(self.event_callback)
@ -40,12 +42,9 @@ class Player:
def event_callback(self, event: mpv.MpvEvent) -> None:
e = event.as_dict()
if e["event"] == b"shutdown":
self.closing = True
self.quit_callback()
def close(self):
if not self.closing:
self.mpv.terminate()
if not self.closing:
self.closing = True
self.quit_callback()
def update_qr(self, qr_string: str) -> None:
qr = QRCode(box_size=5, border=1)
@ -54,6 +53,9 @@ class Player:
self.qr = qr.make_image().convert("RGBA")
def osd_size_handler(self, attribute: str, value: int) -> None:
if self.mpv is None:
print("MPV is not initialized", file=sys.stderr)
return
if self.qr_overlay:
self.mpv.remove_overlay(self.qr_overlay.overlay_id)
@ -66,6 +68,10 @@ class Player:
self.qr_overlay = self.mpv.create_image_overlay(self.qr, pos=(x_pos, y_pos))
async def queue_next(self, entry: Entry) -> None:
if self.mpv is None:
print("MPV is not initialized", file=sys.stderr)
return
loop = asyncio.get_running_loop()
frame = sys._getframe()
@ -91,6 +97,10 @@ class Player:
self.quit_callback()
def play_image(self, image: str, duration: int, sub_file: Optional[str] = None) -> None:
if self.mpv is None:
print("MPV is not initialized", file=sys.stderr)
return
for property, value in self.default_options.items():
self.mpv[property] = value
self.mpv.image_display_duration = duration
@ -107,6 +117,10 @@ class Player:
audio: Optional[str] = None,
override_options: Optional[dict[str, str]] = None,
) -> None:
if self.mpv is None:
print("MPV is not initialized", file=sys.stderr)
return
if override_options is None:
override_options = {}
for property, value in self.default_options.items():
@ -130,6 +144,10 @@ class Player:
self.quit_callback()
def skip_current(self) -> None:
if self.mpv is None:
print("MPV is not initialized", file=sys.stderr)
return
self.mpv.image_display_duration = 0
self.mpv.play(
f"{self.base_dir}/background.png",

View file

@ -39,7 +39,6 @@ class FileBasedSource(Source):
super().__init__(config)
self.extensions: list[str] = config["extensions"] if "extensions" in config else ["mp3+cdg"]
self.extra_mpv_arguments = ["--scale=oversample"]
self.extra_mpv_options = {"scale": "oversample"}
def has_correct_extension(self, path: Optional[str]) -> bool:

View file

@ -30,7 +30,6 @@ class FilesSource(FileBasedSource):
super().__init__(config)
self.dir = config["dir"] if "dir" in config else "."
self.extra_mpv_arguments = ["--scale=oversample"]
async def get_file_list(self) -> list[str]:
"""Collect all files in ``dir``, that have the correct filename extension"""
@ -60,7 +59,7 @@ class FilesSource(FileBasedSource):
return {"duration": duration}
async def do_buffer(self, entry: Entry) -> Tuple[str, Optional[str]]:
async def do_buffer(self, entry: Entry, pos: int) -> Tuple[str, Optional[str]]:
"""
No buffering needs to be done, since the files are already on disk.

View file

@ -77,7 +77,6 @@ class S3Source(FileBasedSource):
self.tmp_dir: str = config["tmp_dir"] if "tmp_dir" in config else "/tmp/syng"
self.index_file: Optional[str] = config["index_file"] if "index_file" in config else None
self.extra_mpv_arguments = ["--scale=oversample"]
def load_file_list_from_server(self) -> list[str]:
"""
@ -164,7 +163,7 @@ class S3Source(FileBasedSource):
return {"duration": duration}
async def do_buffer(self, entry: Entry) -> Tuple[str, Optional[str]]:
async def do_buffer(self, entry: Entry, pos: int) -> Tuple[str, Optional[str]]:
"""
Download the file from the s3.

View file

@ -80,7 +80,6 @@ class Source(ABC):
attribute.
Source specific tasks will be forwarded to the respective source, like:
- Playing the audio/video
- Buffering the audio/video
- Searching for a query
- Getting an entry from an identifier
@ -93,7 +92,7 @@ class Source(ABC):
``get_entry``, ``search``, ``add_to_config``
Specific client methods:
``buffer``, ``do_buffer``, ``play``, ``skip_current``, ``ensure_playable``,
``buffer``, ``do_buffer``, ``skip_current``, ``ensure_playable``,
``get_missing_metadata``, ``get_config``
Each source has a reference to all files, that are currently queued to
@ -104,7 +103,7 @@ class Source(ABC):
:py:attr:`Entry.ident` to :py:class:`DLFilesEntry`.
- ``player``, the reference to the ``mpv`` process, if it has
started
- ``extra_mpv_arguments``, list of arguments added to the mpv
- ``extra_mpv_options``, dictionary of arguments added to the mpv
instance, can be overwritten by a subclass
- ``source_name``, the string used to identify the source
"""
@ -128,7 +127,6 @@ class Source(ABC):
self.downloaded_files: defaultdict[str, DLFilesEntry] = defaultdict(DLFilesEntry)
self._masterlock: asyncio.Lock = asyncio.Lock()
self._index: list[str] = config["index"] if "index" in config else []
self.extra_mpv_arguments: list[str] = []
self.extra_mpv_options: dict[str, str] = {}
self._skip_next = False
@ -192,7 +190,7 @@ class Source(ABC):
return results
@abstractmethod
async def do_buffer(self, entry: Entry) -> Tuple[str, Optional[str]]:
async def do_buffer(self, entry: Entry, pos: int) -> Tuple[str, Optional[str]]:
"""
Source specific part of buffering.
@ -205,11 +203,13 @@ class Source(ABC):
:param entry: The entry to buffer
:type entry: Entry
:param pos: The position in the queue, the entry is at.
:type pos: int
:returns: A Tuple of the locations for the video and the audio file.
:rtype: Tuple[str, Optional[str]]
"""
async def buffer(self, entry: Entry) -> None:
async def buffer(self, entry: Entry, pos: int) -> None:
"""
Buffer all necessary files for the entry.
@ -223,6 +223,8 @@ class Source(ABC):
:param entry: The entry to buffer
:type entry: Entry
:param pos: The position in the queue, the entry is at.
:type pos: int
:rtype: None
"""
async with self._masterlock:
@ -231,7 +233,7 @@ class Source(ABC):
self.downloaded_files[entry.ident].buffering = True
try:
buffer_task = asyncio.create_task(self.do_buffer(entry))
buffer_task = asyncio.create_task(self.do_buffer(entry, pos))
self.downloaded_files[entry.ident].buffer_task = buffer_task
video, audio = await buffer_task
@ -245,39 +247,6 @@ class Source(ABC):
self.downloaded_files[entry.ident].ready.set()
async def play(self, entry: Entry, player: Player, mpv_options: str) -> None:
"""
Play the entry.
This waits until buffering is complete and starts
playing the entry.
:param entry: The entry to play
:type entry: Entry
:param mpv_options: Extra options for the mpv player
:type mpv_options: str
:rtype: None
"""
await self.ensure_playable(entry)
if self.downloaded_files[entry.ident].failed:
del self.downloaded_files[entry.ident]
return
async with self._masterlock:
if self._skip_next:
self._skip_next = False
entry.skip = True
return
await player.play(
self.downloaded_files[entry.ident].video, self.downloaded_files[entry.ident].audio
)
if self._skip_next:
self._skip_next = False
entry.skip = True
async def skip_current(self, entry: Entry) -> None:
"""
Skips first song in the queue.
@ -308,7 +277,7 @@ class Source(ABC):
:type entry: Entry
:rtype: None
"""
await self.buffer(entry)
await self.buffer(entry, 0)
dlfilesentry = self.downloaded_files[entry.ident]
await dlfilesentry.ready.wait()
return dlfilesentry.video, dlfilesentry.audio

View file

@ -233,25 +233,22 @@ class YoutubeSource(Source):
"""
return {"channels": self.channels}
async def play(self, entry: Entry, player: Player, mpv_options: str) -> None:
async def ensure_playable(self, entry: Entry) -> tuple[str, Optional[str]]:
"""
Play the given entry.
Ensure that the entry is playable.
If ``start_streaming`` is set and buffering is not yet done, starts
immediatly and forwards the url to ``mpv``.
If the entry is not yet downloaded, download it.
If start_streaming is set, start streaming immediatly.
Otherwise wait for buffering and start playing.
:param entry: The entry to play.
:param entry: The entry to download.
:type entry: Entry
:param mpv_options: The options to pass to ``mpv``.
:type mpv_options: str
:rtype: None
"""
if self.start_streaming and not self.downloaded_files[entry.ident].complete:
await player.play(entry.ident)
else:
await super().play(entry, player, mpv_options)
return (entry.ident, None)
return await super().ensure_playable(entry)
async def get_entry(
self,
@ -376,7 +373,7 @@ class YoutubeSource(Source):
}
return {}
async def do_buffer(self, entry: Entry) -> Tuple[str, Optional[str]]:
async def do_buffer(self, entry: Entry, pos: int) -> Tuple[str, Optional[str]]:
"""
Download the video.
@ -387,11 +384,20 @@ class YoutubeSource(Source):
location exists, the return value for the audio part will always be
``None``.
If pos is 0 and start_streaming is set, no buffering is done, instead the
youtube url is returned.
:param entry: The entry to download.
:type entry: Entry
:param pos: The position in the video to start buffering.
:type pos: int
:return: The location of the video file and ``None``.
:rtype: Tuple[str, Optional[str]]
"""
if pos == 0 and self.start_streaming:
return entry.ident, None
info: Any = await asyncio.to_thread(self._yt_dlp.extract_info, entry.ident)
combined_path = info["requested_downloads"][0]["filepath"]
return combined_path, None