From 13d13908cc236d98bda354e4a779c3dba00e78fa Mon Sep 17 00:00:00 2001
From: Christoph Stahl <christoph.stahl@tu-dortmund.de>
Date: Sun, 27 Nov 2022 14:53:25 +0100
Subject: [PATCH] Code cleanup and typings

---
 pyproject.toml          |  10 ++
 syng/client.py          |  94 ++++++++++--------
 syng/entry.py           |   4 +
 syng/result.py          |   3 +-
 syng/server.py          |  18 +++-
 syng/sources/common.py  |  15 ---
 syng/sources/s3.py      | 156 +++++++++++++----------------
 syng/sources/source.py  |  84 ++++++++++++----
 syng/sources/youtube.py | 214 ++++++++++++++++++++--------------------
 syng/test.py            |  24 -----
 syng/webclientmockup.py |  11 ++-
 11 files changed, 334 insertions(+), 299 deletions(-)
 delete mode 100644 syng/sources/common.py
 delete mode 100644 syng/test.py

diff --git a/pyproject.toml b/pyproject.toml
index 4af5f5f..966dfca 100644
--- a/pyproject.toml
+++ b/pyproject.toml
@@ -31,3 +31,13 @@ build-backend = "poetry.core.masonry.api"
 exclude = [ ".venv" ]
 venvPath = "."
 venv = ".venv"
+
+[[tool.mypy.overrides]]
+module = [
+  "pytube",
+  "minio",
+  "aiocmd",
+  "pyqrcode",
+  "socketio"
+]
+ignore_missing_imports = true
diff --git a/syng/client.py b/syng/client.py
index 5aae08e..3174b0b 100644
--- a/syng/client.py
+++ b/syng/client.py
@@ -5,6 +5,8 @@ from traceback import print_exc
 from json import load
 import logging
 from argparse import ArgumentParser
+from dataclasses import dataclass, field
+from typing import Optional, Any
 
 import socketio
 import pyqrcode
@@ -13,25 +15,41 @@ from .sources import Source, configure_sources
 from .entry import Entry
 
 
-sio = socketio.AsyncClient()
-logger = logging.getLogger(__name__)
+sio: socketio.AsyncClient = socketio.AsyncClient()
+logger: logging.Logger = logging.getLogger(__name__)
 sources: dict[str, Source] = {}
 
 
-currentLock = asyncio.Semaphore(0)
-state = {"current": None, "queue": [], "recent": [], "room": None, "server": "", "secret": ""}
+currentLock: asyncio.Semaphore = asyncio.Semaphore(0)
+
+
+@dataclass
+class State:
+    current_source: Optional[Source] = None
+    queue: list[Entry] = field(default_factory=list)
+    recent: list[Entry] = field(default_factory=list)
+    room: str = ""
+    server: str = ""
+    secret: str = ""
+
+
+state: State = State()
 
 
 @sio.on("skip")
 async def handle_skip():
     logger.info("Skipping current")
-    await state["current"].skip_current(state["queue"][0])
+    await state.current_source.skip_current(state.queue[0])
 
 
 @sio.on("state")
-async def handle_state(data):
-    state["queue"] = [Entry(**entry) for entry in data["queue"]]
-    state["recent"] = [Entry(**entry) for entry in data["recent"]]
+async def handle_state(data: dict[str, Any]):
+    state.queue = [Entry(**entry) for entry in data["queue"]]
+    state.recent = [Entry(**entry) for entry in data["recent"]]
+
+    for entry in state.queue[:2]:
+        logger.warning(f"Buffering: %s", entry.title)
+        await sources[entry.source].buffer(entry)
 
 
 @sio.on("connect")
@@ -40,35 +58,29 @@ async def handle_connect():
     await sio.emit(
         "register-client",
         {
-            "secret": state["secret"],
-            "queue": [entry.to_dict() for entry in state["queue"]],
-            "recent": [entry.to_dict() for entry in state["recent"]],
-            "room": state["room"],
+            "secret": state.secret,
+            "queue": [entry.to_dict() for entry in state.queue],
+            "recent": [entry.to_dict() for entry in state.recent],
+            "room": state.room,
         },
     )
 
 
 @sio.on("buffer")
-async def handle_buffer(data):
-    source = sources[data["source"]]
-    meta_info = await source.buffer(Entry(**data))
+async def handle_buffer(data: dict[str, Any]):
+    source: Source = sources[data["source"]]
+    meta_info: dict[str, Any] = await source.get_missing_metadata(Entry(**data))
     await sio.emit("meta-info", {"uuid": data["uuid"], "meta": meta_info})
 
 
-async def buffer_and_report(entry):
-    meta_info = await sources[entry.source].buffer(entry)
-    await sio.emit("meta-info", {"uuid": entry.uuid, "meta": meta_info})
-
-
 @sio.on("play")
-async def handle_play(data):
-    entry = Entry(**data)
+async def handle_play(data: dict[str, Any]):
+    entry: Entry = Entry(**data)
     print(
         f"Playing: {entry.artist} - {entry.title} [{entry.album}] ({entry.source}) for {entry.performer}"
     )
     try:
-        state["current"] = sources[entry.source]
-        asyncio.create_task(buffer_and_report(entry))
+        state.current_source = sources[entry.source]
         await sources[entry.source].play(entry)
     except Exception:
         print_exc()
@@ -77,16 +89,14 @@ async def handle_play(data):
 
 
 @sio.on("client-registered")
-async def handle_register(data):
+async def handle_register(data: dict[str, Any]):
     if data["success"]:
         logging.info("Registered")
-        print(f"Join here: {state['server']}/{data['room']}")
-        print(
-            pyqrcode.create(f"{state['server']}/{data['room']}").terminal(quiet_zone=1)
-        )
-        state["room"] = data["room"]
+        print(f"Join here: {state.server}/{data['room']}")
+        print(pyqrcode.create(f"{state.server}/{data['room']}").terminal(quiet_zone=1))
+        state.room = data["room"]
         await sio.emit("sources", {"sources": list(sources.keys())})
-        if state["current"] is None:
+        if state.current_source is None:
             await sio.emit("get-first")
     else:
         logging.warning("Registration failed")
@@ -94,11 +104,13 @@ async def handle_register(data):
 
 
 @sio.on("request-config")
-async def handle_request_config(data):
+async def handle_request_config(data: dict[str, Any]):
     if data["source"] in sources:
-        config = await sources[data["source"]].get_config()
+        config: dict[str, Any] | list[dict[str, Any]] = await sources[
+            data["source"]
+        ].get_config()
         if isinstance(config, list):
-            num_chunks = len(config)
+            num_chunks: int = len(config)
             for current, chunk in enumerate(config):
                 await sio.emit(
                     "config-chunk",
@@ -114,7 +126,7 @@ async def handle_request_config(data):
 
 
 async def aiomain():
-    parser = ArgumentParser()
+    parser: ArgumentParser = ArgumentParser()
 
     parser.add_argument("--room", "-r")
     parser.add_argument("--secret", "-s")
@@ -127,15 +139,17 @@ async def aiomain():
         source_config = load(file)
     sources.update(configure_sources(source_config))
     if args.room:
-        state["room"] = args.room
+        state.room = args.room
 
     if args.secret:
-        state["secret"] = args.secret
+        state.secret = args.secret
     else:
-        state["secret"] = ''.join(secrets.choice(string.ascii_letters + string.digits) for _ in range(8))
-        print(f"Generated secret: {state['secret']}")
+        state.secret = "".join(
+            secrets.choice(string.ascii_letters + string.digits) for _ in range(8)
+        )
+        print(f"Generated secret: {state.secret}")
 
-    state["server"] = args.server
+    state.server = args.server
 
     await sio.connect(args.server)
     await sio.wait()
diff --git a/syng/entry.py b/syng/entry.py
index e06bc20..b713a18 100644
--- a/syng/entry.py
+++ b/syng/entry.py
@@ -1,6 +1,10 @@
 from __future__ import annotations
 from dataclasses import dataclass, field
 from uuid import uuid4, UUID
+from typing import TYPE_CHECKING
+
+if TYPE_CHECKING:
+    from .sources import Source
 
 
 @dataclass
diff --git a/syng/result.py b/syng/result.py
index 84f2e79..bb6d639 100644
--- a/syng/result.py
+++ b/syng/result.py
@@ -1,5 +1,6 @@
 from __future__ import annotations
 from dataclasses import dataclass
+from typing import Optional
 import os.path
 
 
@@ -21,7 +22,7 @@ class Result:
         }
 
     @staticmethod
-    def from_filename(filename, source) -> Result | None:
+    def from_filename(filename, source) -> Optional[Result]:
         try:
             splitfile = os.path.basename(filename[:-4]).split(" - ")
             ident = filename
diff --git a/syng/server.py b/syng/server.py
index d38e967..136a18b 100644
--- a/syng/server.py
+++ b/syng/server.py
@@ -33,9 +33,6 @@ logging.basicConfig(level=logging.INFO)
 logger = logging.getLogger(__name__)
 
 
-clients = {}
-
-
 class Queue:
     def __init__(self, *args, **kwargs):
         self._queue = deque(*args, **kwargs)
@@ -79,6 +76,9 @@ class State:
     sid: str
 
 
+clients: dict[str, State] = {}
+
+
 @sio.on("get-state")
 async def handle_state(sid, data: dict[str, Any] = {}):
     async with sio.session(sid) as session:
@@ -204,8 +204,18 @@ async def handle_register_client(sid, data: dict[str, Any]):
         logger.info("Registerd new client %s", room)
         initial_entries = [Entry(**entry) for entry in data["queue"]]
         initial_recent = [Entry(**entry) for entry in data["recent"]]
-        clients[room] = State(data["secret"], {}, [], Queue(initial_entries), initial_recent, sid)
+        clients[room] = State(
+            data["secret"], {}, [], Queue(initial_entries), initial_recent, sid
+        )
         sio.enter_room(sid, room)
+        await sio.emit(
+            "state",
+            {
+                "queue": clients[room].queue.to_dict(),
+                "recent": [entry.to_dict() for entry in clients[room].recent],
+            },
+            room=sid,
+        )
         await sio.emit("client-registered", {"success": True, "room": room}, room=sid)
 
 
diff --git a/syng/sources/common.py b/syng/sources/common.py
deleted file mode 100644
index 8b917dc..0000000
--- a/syng/sources/common.py
+++ /dev/null
@@ -1,15 +0,0 @@
-from __future__ import annotations
-import asyncio
-
-
-async def play_mpv(
-        video: str, audio: str | None, options: list[str] = list()
-) -> asyncio.subprocess.Process:
-    args = ["--fullscreen", *options, video] + ([f"--audio-file={audio}"] if audio else [])
-
-    mpv_process = asyncio.create_subprocess_exec("mpv", *args)
-    return await mpv_process
-
-
-def kill_mpv(mpv: asyncio.subprocess.Process):
-    mpv.terminate()
diff --git a/syng/sources/s3.py b/syng/sources/s3.py
index d9d07a1..525c592 100644
--- a/syng/sources/s3.py
+++ b/syng/sources/s3.py
@@ -1,43 +1,42 @@
 from json import load, dump
 from time import sleep, perf_counter
 from itertools import zip_longest
-from threading import Event, Lock
-from asyncio import Future
+import asyncio
 import os
+from typing import Tuple, Optional
 
 from minio import Minio
 
 import mutagen
 
-from .source import Source, async_in_thread, available_sources
-from .common import play_mpv, kill_mpv
+from .source import Source, available_sources
 from ..result import Result
 from ..entry import Entry
 
 
 class S3Source(Source):
     def __init__(self, config):
-        super().__init__()
+        super().__init__(config)
 
         if "endpoint" in config and "access_key" in config and "secret_key" in config:
-            self.minio = Minio(
+            self.minio: Minio = Minio(
                 config["endpoint"],
                 access_key=config["access_key"],
                 secret_key=config["secret_key"],
             )
-            self.bucket = config["bucket"]
-            self.tmp_dir = config["tmp_dir"] if "tmp_dir" in config else "/tmp/syng"
+            self.bucket: str = config["bucket"]
+            self.tmp_dir: str = (
+                config["tmp_dir"] if "tmp_dir" in config else "/tmp/syng"
+            )
 
-        self.index = [] if "index" not in config else config["index"]
-        self.downloaded_files = {}
-        self.player = None
-        self.masterlock = Lock()
+        self.index: list[str] = [] if "index" not in config else config["index"]
+        self.extra_mpv_arguments = ["--scale=oversample"]
 
-    async def get_entry(self, performer: str, filename: str) -> Entry:
-        res = Result.from_filename(filename, "s3")
+    async def get_entry(self, performer: str, ident: str) -> Entry:
+        res: Optional[Result] = Result.from_filename(ident, "s3")
         if res is not None:
             return Entry(
-                id=filename,
+                id=ident,
                 source="s3",
                 duration=180,
                 album=res.album,
@@ -45,96 +44,83 @@ class S3Source(Source):
                 artist=res.artist,
                 performer=performer,
             )
-        raise RuntimeError(f"Could not parse {filename}")
+        raise RuntimeError(f"Could not parse {ident}")
 
-    async def play(self, entry) -> None:
-        while not entry.uuid in self.downloaded_files:
-            sleep(0.1)
+    async def get_config(self) -> dict | list[dict]:
+        def _get_config() -> dict | list[dict]:
+            if not self.index:
+                print(f"Indexing {self.bucket}")
+                # self.index = [
+                #     obj.object_name
+                #     for obj in self.minio.list_objects(self.bucket, recursive=True)
+                # ]
+                # with open("s3_files", "w") as f:
+                #     dump(self.index, f)
+                with open("s3_files", "r") as f:
+                    self.index = [item for item in load(f) if item.endswith(".cdg")]
 
-        self.downloaded_files[entry.uuid]["lock"].wait()
+            chunked = zip_longest(*[iter(self.index)] * 1000, fillvalue="")
+            return [
+                {"index": list(filter(lambda x: x != "", chunk))} for chunk in chunked
+            ]
 
-        cdg_file = self.downloaded_files[entry.uuid]["cdg"]
-        mp3_file = self.downloaded_files[entry.uuid]["mp3"]
+        return await asyncio.to_thread(_get_config)
 
-        self.player = await play_mpv(
-            cdg_file, mp3_file, ["--scale=oversample"]
-        )
-
-        await self.player.wait()
-
-    async def skip_current(self, entry) -> None:
-        await self.player.kill()
-
-    @async_in_thread
-    def get_config(self):
-        if not self.index:
-            print("Start indexing")
-            start = perf_counter()
-            # self.index = [
-            #     obj.object_name
-            #     for obj in self.minio.list_objects(self.bucket, recursive=True)
-            # ]
-            # with open("s3_files", "w") as f:
-            #     dump(self.index, f)
-            with open("s3_files", "r") as f:
-                self.index = [item for item in load(f) if item.endswith(".cdg")]
-            print(len(self.index))
-            stop = perf_counter()
-            print(f"Took {stop - start:0.4f} seconds")
-
-        chunked = zip_longest(*[iter(self.index)] * 1000, fillvalue="")
-        return [{"index": list(filter(lambda x: x != "", chunk))} for chunk in chunked]
-
-    def add_to_config(self, config):
+    def add_to_config(self, config: dict) -> None:
         self.index += config["index"]
 
-    @async_in_thread
-    def search(self, result_future: Future, query: str) -> None:
+    async def search(self, result_future: asyncio.Future, query: str) -> None:
         print("searching s3")
-        filtered = self.filter_data_by_query(query, self.index)
-        results = []
+        filtered: list[str] = self.filter_data_by_query(query, self.index)
+        results: list[Result] = []
         for filename in filtered:
-            print(filename)
-            result = Result.from_filename(filename, "s3")
-            print(result)
+            result: Optional[Result] = Result.from_filename(filename, "s3")
             if result is None:
                 continue
             results.append(result)
-        print(results)
         result_future.set_result(results)
 
-    @async_in_thread
-    def buffer(self, entry: Entry) -> dict:
-        with self.masterlock:
-            if entry.uuid in self.downloaded_files:
-                return {}
-            self.downloaded_files[entry.uuid] = {"lock": Event()}
+    async def get_missing_metadata(self, entry: Entry) -> dict:
+        def mutagen_wrapped(file: str) -> int:
+            meta_infos = mutagen.File(file).info
+            return int(meta_infos.length)
 
-        cdg_filename = os.path.basename(entry.id)
-        path_to_file = os.path.dirname(entry.id)
+        await self.ensure_playable(entry)
 
-        cdg_path_with_uuid = os.path.join(path_to_file, f"{entry.uuid}-{cdg_filename}")
-        target_file_cdg = os.path.join(self.tmp_dir, cdg_path_with_uuid)
+        audio_file_name: Optional[str] = self.downloaded_files[entry.id].audio
 
-        ident_mp3 = entry.id[:-3] + "mp3"
-        target_file_mp3 = target_file_cdg[:-3] + "mp3"
+        if audio_file_name is None:
+            duration: int = 180
+        else:
+            duration = await asyncio.to_thread(mutagen_wrapped, audio_file_name)
+
+        return {"duration": int(duration)}
+
+    async def doBuffer(self, entry: Entry) -> Tuple[str, Optional[str]]:
+        cdg_filename: str = os.path.basename(entry.id)
+        path_to_file: str = os.path.dirname(entry.id)
+
+        cdg_path: str = os.path.join(path_to_file, cdg_filename)
+        target_file_cdg: str = os.path.join(self.tmp_dir, cdg_path)
+
+        ident_mp3: str = entry.id[:-3] + "mp3"
+        target_file_mp3: str = target_file_cdg[:-3] + "mp3"
         os.makedirs(os.path.dirname(target_file_cdg), exist_ok=True)
 
-        print(
-            f'self.minio.fget_object("{self.bucket}", "{entry.id}", "{target_file_cdg}")'
+        video_task: asyncio.Task = asyncio.create_task(
+            asyncio.to_thread(
+                self.minio.fget_object, self.bucket, entry.id, target_file_cdg
+            )
+        )
+        audio_task: asyncio.Task = asyncio.create_task(
+            asyncio.to_thread(
+                self.minio.fget_object, self.bucket, ident_mp3, target_file_mp3
+            )
         )
-        self.minio.fget_object(self.bucket, entry.id, target_file_cdg)
-        self.minio.fget_object(self.bucket, ident_mp3, target_file_mp3)
 
-        self.downloaded_files[entry.uuid]["cdg"] = target_file_cdg
-        self.downloaded_files[entry.uuid]["mp3"] = target_file_mp3
-        self.downloaded_files[entry.uuid]["lock"].set()
-
-        meta_infos = mutagen.File(target_file_mp3).info
-
-        print(f"duration is {meta_infos.length}")
-
-        return {"duration": int(meta_infos.length)}
+        await video_task
+        await audio_task
+        return target_file_cdg, target_file_mp3
 
 
 available_sources["s3"] = S3Source
diff --git a/syng/sources/source.py b/syng/sources/source.py
index e7d8039..080f128 100644
--- a/syng/sources/source.py
+++ b/syng/sources/source.py
@@ -1,42 +1,86 @@
 from __future__ import annotations
 import shlex
 import asyncio
-from typing import Callable, Awaitable
+from typing import Tuple, Optional, Type, Any
 import os.path
+from collections import defaultdict
+from dataclasses import dataclass, field
 
 from ..entry import Entry
-from ..result import Result
 
 
-def async_in_thread(func: Callable) -> Awaitable:
-    async def wrapper(*args, **kwargs):
-        loop = asyncio.get_running_loop()
-        return await loop.run_in_executor(None, func, *args, **kwargs)
-
-    return wrapper
+@dataclass
+class DLFilesEntry:
+    ready: asyncio.Event = field(default_factory=asyncio.Event)
+    video: str = ""
+    audio: Optional[str] = None
+    buffering: bool = False
+    complete: bool = False
+    skip: bool = False
 
 
 class Source:
+    def __init__(self, config: dict[str, Any]):
+        self.downloaded_files: defaultdict[str, DLFilesEntry] = defaultdict(
+            DLFilesEntry
+        )
+        self.masterlock: asyncio.Lock = asyncio.Lock()
+        self.player: Optional[asyncio.subprocess.Process] = None
+        self.extra_mpv_arguments: list[str] = []
+
+    @staticmethod
+    async def play_mpv(
+        video: str, audio: str | None, /, *options
+    ) -> asyncio.subprocess.Process:
+        args = ["--fullscreen", *options, video] + (
+            [f"--audio-file={audio}"] if audio else []
+        )
+
+        mpv_process = asyncio.create_subprocess_exec("mpv", *args)
+        return await mpv_process
+
     async def get_entry(self, performer: str, ident: str) -> Entry:
         raise NotImplementedError
 
     async def search(self, result_future: asyncio.Future, query: str) -> None:
         raise NotImplementedError
 
-    async def buffer(self, entry: Entry) -> dict:
-        return {}
-
-    async def play(self, entry: Entry) -> None:
+    async def doBuffer(self, entry: Entry) -> Tuple[str, Optional[str]]:
         raise NotImplementedError
 
+    async def buffer(self, entry: Entry):
+        async with self.masterlock:
+            if self.downloaded_files[entry.id].buffering:
+                print(f"already buffering {entry.title}")
+                return
+            self.downloaded_files[entry.id].buffering = True
+
+        video, audio = await self.doBuffer(entry)
+        self.downloaded_files[entry.id].video = video
+        self.downloaded_files[entry.id].audio = audio
+        self.downloaded_files[entry.id].complete = True
+        self.downloaded_files[entry.id].ready.set()
+        print(f"Buffering done for {entry.title}")
+
+    async def play(self, entry: Entry) -> None:
+        await self.ensure_playable(entry)
+        self.player = await self.play_mpv(
+            self.downloaded_files[entry.id].video,
+            self.downloaded_files[entry.id].audio,
+            *self.extra_mpv_arguments,
+        )
+        await self.player.wait()
+
     async def skip_current(self, entry: Entry) -> None:
-        pass
+        if self.player is not None:
+            self.player.kill()
 
-    async def init_server(self) -> None:
-        pass
+    async def ensure_playable(self, entry: Entry):
+        await self.buffer(entry)
+        await self.downloaded_files[entry.id].ready.wait()
 
-    async def init_client(self) -> None:
-        pass
+    async def get_missing_metadata(self, entry: Entry) -> dict[str, Any]:
+        return {}
 
     def filter_data_by_query(self, query: str, data: list[str]) -> list[str]:
         def contains_all_words(words: list[str], element: str) -> bool:
@@ -48,11 +92,11 @@ class Source:
         splitquery = shlex.split(query)
         return [element for element in data if contains_all_words(splitquery, element)]
 
-    async def get_config(self) -> dict:
+    async def get_config(self) -> dict[str, Any] | list[dict[str, Any]]:
         raise NotImplementedError
 
-    def add_to_config(self, config) -> None:
+    def add_to_config(self, config: dict[str, Any]) -> None:
         pass
 
 
-available_sources = {}
+available_sources: dict[str, Type[Source]] = {}
diff --git a/syng/sources/youtube.py b/syng/sources/youtube.py
index 22f1195..bd40234 100644
--- a/syng/sources/youtube.py
+++ b/syng/sources/youtube.py
@@ -1,131 +1,131 @@
 import asyncio
 import shlex
 from functools import partial
-from threading import Event, Lock
+from typing import Optional, Tuple, Any
 
-from pytube import YouTube, Search, Channel, innertube
+from pytube import YouTube, Search, Channel, innertube, Stream, StreamQuery
 
-from .common import play_mpv, kill_mpv
-from .source import Source, async_in_thread, available_sources
+from .source import Source, available_sources
 from ..entry import Entry
 from ..result import Result
 
 
 class YoutubeSource(Source):
-    def __init__(self, config):
-        super().__init__()
-        self.innertube_client = innertube.InnerTube(client="WEB")
-        self.channels = config["channels"] if "channels" in config else []
-        self.tmp_dir = config["tmp_dir"] if "tmp_dir" in config else "/tmp/syng"
-        self.player: None | asyncio.subprocess.Process = None
-        self.downloaded_files = {}
-        self.masterlock = Lock()
+    def __init__(self, config: dict[str, Any]):
+        super().__init__(config)
+        self.innertube_client: innertube.InnerTube = innertube.InnerTube(client="WEB")
+        self.channels: list[str] = config["channels"] if "channels" in config else []
+        self.tmp_dir: str = config["tmp_dir"] if "tmp_dir" in config else "/tmp/syng"
+        self.max_res: int = config["max_res"] if "max_res" in config else 720
+        self.start_streaming: bool = (
+            config["start_streaming"] if "start_streaming" in config else False
+        )
 
-    async def get_config(self):
+    async def get_config(self) -> dict | list[dict]:
         return {"channels": self.channels}
 
     async def play(self, entry: Entry) -> None:
-
-        if entry.uuid in self.downloaded_files and "video" in self.downloaded_files[entry.uuid]:
-            print("playing locally")
-            video_file = self.downloaded_files[entry.uuid]["video"]
-            audio_file = self.downloaded_files[entry.uuid]["audio"]
-            self.player = await play_mpv(video_file, audio_file)
-        else:
+        if self.start_streaming and not self.downloaded_files[entry.id].complete:
             print("streaming")
-            self.player = await play_mpv(
+            self.player = await self.play_mpv(
                 entry.id,
                 None,
-                [
-                    "--script-opts=ytdl_hook-ytdl_path=yt-dlp,ytdl_hook-exclude='%.pls$'",
-                    "--ytdl-format=bestvideo[height<=720]+bestaudio/best[height<=720]",
-                    "--fullscreen",
-                ],
+                "--script-opts=ytdl_hook-ytdl_path=yt-dlp,ytdl_hook-exclude='%.pls$'",
+                f"--ytdl-format=bestvideo[height<={self.max_res}]+bestaudio/best[height<={self.max_res}]",
+                "--fullscreen",
+            )
+            await self.player.wait()
+        else:
+            await super().play(entry)
+
+    async def get_entry(self, performer: str, ident: str) -> Entry:
+        def _get_entry(performer: str, url: str) -> Entry:
+            yt = YouTube(url)
+            return Entry(
+                id=url,
+                source="youtube",
+                album="YouTube",
+                duration=yt.length,
+                title=yt.title,
+                artist=yt.author,
+                performer=performer,
             )
 
-        await self.player.wait()
+        return await asyncio.to_thread(_get_entry, performer, ident)
 
-    async def skip_current(self, entry) -> None:        
-        await self.player.kill()
-
-    @async_in_thread
-    def get_entry(self, performer: str, url: str) -> Entry:
-        yt = YouTube(url)
-        return Entry(
-            id=url,
-            source="youtube",
-            album="YouTube",
-            duration=yt.length,
-            title=yt.title,
-            artist=yt.author,
-            performer=performer,
-        )
-
-    def _contains_index(self, query, result):
-        compare_string = result.title.lower() + " " + result.author.lower()
-        hits = 0
-        queries = shlex.split(query.lower())
+    def _contains_index(self, query: str, result: YouTube) -> float:
+        compare_string: str = result.title.lower() + " " + result.author.lower()
+        hits: int = 0
+        queries: list[str] = shlex.split(query.lower())
         for word in queries:
             if word in compare_string:
                 hits += 1
 
         return 1 - (hits / len(queries))
 
-    @async_in_thread
-    def search(self, result_future: asyncio.Future, query: str) -> None:
-        results = []
-        for channel in self.channels:
-            results += self._channel_search(query, channel)
-        results += Search(query + " karaoke").results
+    async def search(self, result_future: asyncio.Future, query: str) -> None:
+        def _search(result_future: asyncio.Future, query: str) -> None:
+            results: list[YouTube] = []
+            for channel in self.channels:
+                results += self._channel_search(query, channel)
+            search_results: Optional[list[YouTube]] = Search(query + " karaoke").results
+            if search_results is not None:
+                results += search_results
 
-        results.sort(key=partial(self._contains_index, query))
+            results.sort(key=partial(self._contains_index, query))
 
-        result_future.set_result(
-            [
-                Result(
-                    id=result.watch_url,
-                    source="youtube",
-                    title=result.title,
-                    artist=result.author,
-                    album="YouTube",
-                )
-                for result in results
-            ]
-        )
+            result_future.set_result(
+                [
+                    Result(
+                        id=result.watch_url,
+                        source="youtube",
+                        title=result.title,
+                        artist=result.author,
+                        album="YouTube",
+                    )
+                    for result in results
+                ]
+            )
 
-    def _channel_search(self, query, channel) -> list:
-        browseID = Channel(f"https://www.youtube.com{channel}").channel_id
-        endpoint = f"{self.innertube_client.base_url}/browse"
+        await asyncio.to_thread(_search, result_future, query)
 
-        data = {"query": query, "browseId": browseID, "params": "EgZzZWFyY2g%3D"}
+    def _channel_search(self, query: str, channel: str) -> list[YouTube]:
+        browse_id: str = Channel(f"https://www.youtube.com{channel}").channel_id
+        endpoint: str = f"{self.innertube_client.base_url}/browse"
+
+        data: dict[str, str] = {
+            "query": query,
+            "browseId": browse_id,
+            "params": "EgZzZWFyY2g%3D",
+        }
         data.update(self.innertube_client.base_data)
-        results = self.innertube_client._call_api(
+        results: dict = self.innertube_client._call_api(
             endpoint, self.innertube_client.base_params, data
         )
-        items = results["contents"]["twoColumnBrowseResultsRenderer"]["tabs"][-1][
+        items: list = results["contents"]["twoColumnBrowseResultsRenderer"]["tabs"][-1][
             "expandableTabRenderer"
         ]["content"]["sectionListRenderer"]["contents"]
 
-        list_of_videos = []
+        list_of_videos: list[YouTube] = []
         for item in items:
             try:
                 if (
                     "itemSectionRenderer" in item
                     and "videoRenderer" in item["itemSectionRenderer"]["contents"][0]
                 ):
-                    yt_url = (
+                    yt_url: str = (
                         "https://youtube.com/watch?v="
                         + item["itemSectionRenderer"]["contents"][0]["videoRenderer"][
                             "videoId"
                         ]
                     )
-                    author = item["itemSectionRenderer"]["contents"][0][
+                    author: str = item["itemSectionRenderer"]["contents"][0][
                         "videoRenderer"
                     ]["ownerText"]["runs"][0]["text"]
-                    title = item["itemSectionRenderer"]["contents"][0]["videoRenderer"][
-                        "title"
-                    ]["runs"][0]["text"]
-                    yt = YouTube(yt_url)
+                    title: str = item["itemSectionRenderer"]["contents"][0][
+                        "videoRenderer"
+                    ]["title"]["runs"][0]["text"]
+                    yt: YouTube = YouTube(yt_url)
                     yt.author = author
                     yt.title = title
                     list_of_videos.append(yt)
@@ -134,39 +134,41 @@ class YoutubeSource(Source):
                 pass
         return list_of_videos
 
-    @async_in_thread
-    def buffer(self, entry: Entry) -> dict:
-        print(f"Buffering {entry}")
-        with self.masterlock:
-            if entry.uuid in self.downloaded_files:
-                print(f"Already buffering {entry}")
-                return {}
-            self.downloaded_files[entry.uuid] = {}
+    async def doBuffer(self, entry: Entry) -> Tuple[str, Optional[str]]:
+        yt: YouTube = YouTube(entry.id)
 
-        yt = YouTube(entry.id)
+        streams: StreamQuery = await asyncio.to_thread(lambda: yt.streams)
 
-        streams = yt.streams
-    
-        video_streams = streams.filter(
-                type="video", 
-                custom_filter_functions=[lambda s: int(s.resolution[:-1]) <= 1080]
-                )
-        audio_streams = streams.filter(only_audio=True)
+        video_streams: StreamQuery = streams.filter(
+            type="video",
+            custom_filter_functions=[lambda s: int(s.resolution[:-1]) <= self.max_res],
+        )
+        audio_streams: StreamQuery = streams.filter(only_audio=True)
 
-        best_720_stream = sorted(video_streams, key=lambda s: int(s.resolution[:-1]) + (1 if s.is_progressive else 0))[-1]
-        best_audio_stream = sorted(audio_streams, key=lambda s: int(s.abr[:-4]))[-1]
+        best_video_stream: Stream = sorted(
+            video_streams,
+            key=lambda s: int(s.resolution[:-1]) + (1 if s.is_progressive else 0),
+        )[-1]
+        best_audio_stream: Stream = sorted(
+            audio_streams, key=lambda s: int(s.abr[:-4])
+        )[-1]
 
-        print(best_720_stream)
-        print(best_audio_stream)
+        audio: Optional[str] = (
+            await asyncio.to_thread(
+                best_audio_stream.download,
+                output_path=self.tmp_dir,
+                filename_prefix="audio-",
+            )
+            if best_video_stream.is_adaptive
+            else None
+        )
 
-        if not best_720_stream.is_progressive:
-            self.downloaded_files[entry.uuid]["audio"] = best_audio_stream.download(output_path=self.tmp_dir, filename_prefix=f"{entry.uuid}-audio")
-        else:
-            self.downloaded_files[entry.uuid]["audio"] = None
+        video: str = await asyncio.to_thread(
+            best_video_stream.download,
+            output_path=self.tmp_dir,
+        )
 
-        self.downloaded_files[entry.uuid]["video"] = best_720_stream.download(output_path=self.tmp_dir, filename_prefix=entry.uuid)
-
-        return {}
+        return video, audio
 
 
 available_sources["youtube"] = YoutubeSource
diff --git a/syng/test.py b/syng/test.py
deleted file mode 100644
index 58c3008..0000000
--- a/syng/test.py
+++ /dev/null
@@ -1,24 +0,0 @@
-import socketio
-
-
-def append_yt(url):
-    sio = socketio.Client()
-    sio.connect("http://localhost:8080")
-    sio.emit("append", {"source": "youtube", "id": url, "performer": "test"})
-    sio.disconnect()
-
-
-def skip():
-    sio = socketio.Client()
-    sio.connect("http://localhost:8080")
-    sio.emit("register-admin", {"secret": "admin"})
-    sio.emit("skip")
-    sio.disconnect()
-
-
-def search(query):
-    sio = socketio.Client()
-    sio.on("search-result", print)
-    sio.connect("http://localhost:8080")
-    sio.emit("search", {"query": query})
-    sio.disconnect()
diff --git a/syng/webclientmockup.py b/syng/webclientmockup.py
index 0820865..ba23016 100644
--- a/syng/webclientmockup.py
+++ b/syng/webclientmockup.py
@@ -1,11 +1,14 @@
-import socketio
 import asyncio
+from typing import Any
+
+from aiocmd import aiocmd
+import socketio
+
 from .result import Result
 from .entry import Entry
-from aiocmd import aiocmd
 
-sio = socketio.AsyncClient()
-state = {}
+sio: socketio.AsyncClient = socketio.AsyncClient()
+state: dict[str, Any] = {}
 
 
 @sio.on("search-results")