mypy strict

This commit is contained in:
Christoph Stahl 2022-11-27 16:22:07 +01:00
parent 13d13908cc
commit 3760793ed9
12 changed files with 139 additions and 80 deletions

3
aiocmd.pyi Normal file
View file

@ -0,0 +1,3 @@
class aiocmd:
class PromptToolkitCmd:
async def run(self) -> None: ...

6
mutagen.pyi Normal file
View file

@ -0,0 +1,6 @@
class Info:
length: int
class File:
def __init__(self, filename: str): ...
info: Info

33
socketio.pyi Normal file
View file

@ -0,0 +1,33 @@
from typing import Any, Optional, Awaitable, Callable, TypeVar
Handler = TypeVar("Handler", bound=Callable[[str, dict[str, Any]], Any])
ClientHandler = TypeVar("ClientHandler", bound=Callable[[dict[str, Any]], Any])
class _session_context_manager:
async def __aenter__(self) -> dict[str, Any]: ...
async def __aexit__(self, *args: list[Any]) -> None: ...
class AsyncServer:
def __init__(
self, cors_allowed_origins: str, logger: bool, engineio_logger: bool
): ...
async def emit(
self,
message: str,
body: Optional[dict[str, Any]] = None,
room: Optional[str] = None,
) -> None: ...
def session(self, sid: str) -> _session_context_manager: ...
def on(self, event: str) -> Callable[[Handler], Handler]: ...
def enter_room(self, sid: str, room: str) -> None: ...
def leave_room(self, sid: str, room: str) -> None: ...
def attach(self, app: Any) -> None: ...
class AsyncClient:
def on(self, event: str) -> Callable[[ClientHandler], ClientHandler]: ...
async def wait(self) -> None: ...
async def connect(self, server: str) -> None: ...
async def disconnect(self) -> None: ...
async def emit(
self, message: str, data: Optional[dict[str, Any]] = None
) -> None: ...

View file

@ -37,13 +37,14 @@ state: State = State()
@sio.on("skip")
async def handle_skip():
async def handle_skip(_: dict[str, Any]) -> None:
logger.info("Skipping current")
await state.current_source.skip_current(state.queue[0])
if state.current_source is not None:
await state.current_source.skip_current(state.queue[0])
@sio.on("state")
async def handle_state(data: dict[str, Any]):
async def handle_state(data: dict[str, Any]) -> None:
state.queue = [Entry(**entry) for entry in data["queue"]]
state.recent = [Entry(**entry) for entry in data["recent"]]
@ -53,7 +54,7 @@ async def handle_state(data: dict[str, Any]):
@sio.on("connect")
async def handle_connect():
async def handle_connect(_: dict[str, Any]) -> None:
logging.info("Connected to server")
await sio.emit(
"register-client",
@ -67,14 +68,14 @@ async def handle_connect():
@sio.on("buffer")
async def handle_buffer(data: dict[str, Any]):
async def handle_buffer(data: dict[str, Any]) -> None:
source: Source = sources[data["source"]]
meta_info: dict[str, Any] = await source.get_missing_metadata(Entry(**data))
await sio.emit("meta-info", {"uuid": data["uuid"], "meta": meta_info})
@sio.on("play")
async def handle_play(data: dict[str, Any]):
async def handle_play(data: dict[str, Any]) -> None:
entry: Entry = Entry(**data)
print(
f"Playing: {entry.artist} - {entry.title} [{entry.album}] ({entry.source}) for {entry.performer}"
@ -89,7 +90,7 @@ async def handle_play(data: dict[str, Any]):
@sio.on("client-registered")
async def handle_register(data: dict[str, Any]):
async def handle_register(data: dict[str, Any]) -> None:
if data["success"]:
logging.info("Registered")
print(f"Join here: {state.server}/{data['room']}")
@ -104,7 +105,7 @@ async def handle_register(data: dict[str, Any]):
@sio.on("request-config")
async def handle_request_config(data: dict[str, Any]):
async def handle_request_config(data: dict[str, Any]) -> None:
if data["source"] in sources:
config: dict[str, Any] | list[dict[str, Any]] = await sources[
data["source"]
@ -125,7 +126,7 @@ async def handle_request_config(data: dict[str, Any]):
await sio.emit("config", {"source": data["source"], "config": config})
async def aiomain():
async def aiomain() -> None:
parser: ArgumentParser = ArgumentParser()
parser.add_argument("--room", "-r")
@ -155,7 +156,7 @@ async def aiomain():
await sio.wait()
def main():
def main() -> None:
asyncio.run(aiomain())

View file

@ -1,7 +1,7 @@
from __future__ import annotations
from dataclasses import dataclass, field
from uuid import uuid4, UUID
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, Any
if TYPE_CHECKING:
from .sources import Source
@ -23,7 +23,7 @@ class Entry:
async def from_source(performer: str, ident: str, source: Source) -> Entry:
return await source.get_entry(performer, ident)
def to_dict(self) -> dict:
def to_dict(self) -> dict[str, Any]:
return {
"uuid": str(self.uuid),
"id": self.id,
@ -36,8 +36,8 @@ class Entry:
}
@staticmethod
def from_dict(entry_dict):
def from_dict(entry_dict: dict[str, Any]) -> Entry:
return Entry(**entry_dict)
def update(self, **kwargs):
def update(self, **kwargs: Any) -> None:
self.__dict__.update(kwargs)

View file

@ -1,18 +1,18 @@
from __future__ import annotations
from dataclasses import dataclass
from typing import Optional
from typing import Optional, Any
import os.path
@dataclass
class Result:
id: str | int
id: str
source: str
title: str
artist: str
album: str
def to_dict(self) -> dict:
def to_dict(self) -> dict[str, Any]:
return {
"id": self.id,
"source": self.source,
@ -22,7 +22,7 @@ class Result:
}
@staticmethod
def from_filename(filename, source) -> Optional[Result]:
def from_filename(filename: str, source: str) -> Optional[Result]:
try:
splitfile = os.path.basename(filename[:-4]).split(" - ")
ident = filename

View file

@ -1,6 +1,6 @@
from __future__ import annotations
from collections import deque
from typing import Any
from typing import Any, Callable
import asyncio
from dataclasses import dataclass
import string
@ -20,7 +20,7 @@ app = web.Application()
sio.attach(app)
async def root_handler(request):
async def root_handler(request: Any) -> Any:
return web.FileResponse("syng/static/index.html")
@ -34,8 +34,8 @@ logger = logging.getLogger(__name__)
class Queue:
def __init__(self, *args, **kwargs):
self._queue = deque(*args, **kwargs)
def __init__(self, initial_entries: list[Entry]):
self._queue = deque(initial_entries)
self.num_of_entries_sem = asyncio.Semaphore(len(self._queue))
self.readlock = asyncio.Lock()
@ -60,7 +60,9 @@ class Queue:
def to_dict(self) -> list[dict[str, Any]]:
return [item.to_dict() for item in self._queue]
def update(self, locator, updater):
def update(
self, locator: Callable[[Entry], Any], updater: Callable[[Entry], None]
) -> None:
for item in self._queue:
if locator(item):
updater(item)
@ -80,7 +82,7 @@ clients: dict[str, State] = {}
@sio.on("get-state")
async def handle_state(sid, data: dict[str, Any] = {}):
async def handle_state(sid: str, data: dict[str, Any] = {}) -> None:
async with sio.session(sid) as session:
room = session["room"]
state = clients[room]
@ -96,7 +98,7 @@ async def handle_state(sid, data: dict[str, Any] = {}):
@sio.on("append")
async def handle_append(sid, data: dict[str, Any]):
async def handle_append(sid: str, data: dict[str, Any]) -> None:
async with sio.session(sid) as session:
room = session["room"]
state = clients[room]
@ -121,7 +123,7 @@ async def handle_append(sid, data: dict[str, Any]):
@sio.on("meta-info")
async def handle_meta_info(sid, data):
async def handle_meta_info(sid: str, data: dict[str, Any]) -> None:
async with sio.session(sid) as session:
room = session["room"]
state = clients[room]
@ -142,7 +144,7 @@ async def handle_meta_info(sid, data):
@sio.on("get-first")
async def handle_get_first(sid, data={}):
async def handle_get_first(sid: str, data: dict[str, Any] = {}) -> None:
async with sio.session(sid) as session:
room = session["room"]
state = clients[room]
@ -153,7 +155,7 @@ async def handle_get_first(sid, data={}):
@sio.on("pop-then-get-next")
async def handle_pop_then_get_next(sid, data={}):
async def handle_pop_then_get_next(sid: str, data: dict[str, Any] = {}) -> None:
async with sio.session(sid) as session:
room = session["room"]
state = clients[room]
@ -173,7 +175,7 @@ async def handle_pop_then_get_next(sid, data={}):
await sio.emit("play", current.to_dict(), room=sid)
def gen_id(length=4) -> str:
def gen_id(length: int = 4) -> str:
client_id = "".join([random.choice(string.ascii_letters) for _ in range(length)])
if client_id in clients:
client_id = gen_id(length + 1)
@ -181,13 +183,13 @@ def gen_id(length=4) -> str:
@sio.on("register-client")
async def handle_register_client(sid, data: dict[str, Any]):
room = data["room"] if "room" in data and data["room"] else gen_id()
async def handle_register_client(sid: str, data: dict[str, Any]) -> None:
room: str = data["room"] if "room" in data and data["room"] else gen_id()
async with sio.session(sid) as session:
session["room"] = room
if room in clients:
old_state = clients[room]
old_state: State = clients[room]
if data["secret"] == old_state.secret:
logger.info("Got new client connection for %s", room)
old_state.sid = sid
@ -220,7 +222,7 @@ async def handle_register_client(sid, data: dict[str, Any]):
@sio.on("sources")
async def handle_sources(sid, data):
async def handle_sources(sid: str, data: dict[str, Any]) -> None:
"""
Get the list of sources the client wants to use.
Update internal list of sources, remove unused
@ -243,7 +245,7 @@ async def handle_sources(sid, data):
@sio.on("config-chunk")
async def handle_config_chung(sid, data):
async def handle_config_chung(sid: str, data: dict[str, Any]) -> None:
async with sio.session(sid) as session:
room = session["room"]
state = clients[room]
@ -258,7 +260,7 @@ async def handle_config_chung(sid, data):
@sio.on("config")
async def handle_config(sid, data):
async def handle_config(sid: str, data: dict[str, Any]) -> None:
async with sio.session(sid) as session:
room = session["room"]
state = clients[room]
@ -268,7 +270,7 @@ async def handle_config(sid, data):
@sio.on("register-web")
async def handle_register_web(sid, data):
async def handle_register_web(sid: str, data: dict[str, Any]) -> bool:
if data["room"] in clients:
async with sio.session(sid) as session:
session["room"] = data["room"]
@ -283,12 +285,11 @@ async def handle_register_web(sid, data):
room=sid,
)
return True
else:
return False
return False
@sio.on("register-admin")
async def handle_register_admin(sid, data: dict[str, str]):
async def handle_register_admin(sid: str, data: dict[str, str]) -> None:
async with sio.session(sid) as session:
room = session["room"]
state = clients[room]
@ -300,7 +301,7 @@ async def handle_register_admin(sid, data: dict[str, str]):
@sio.on("get-config")
async def handle_get_config(sid, data):
async def handle_get_config(sid: str, data: dict[str, Any]) -> None:
async with sio.session(sid) as session:
room = session["room"]
is_admin = session["admin"]
@ -314,7 +315,7 @@ async def handle_get_config(sid, data):
@sio.on("skip")
async def handle_skip(sid, data={}):
async def handle_skip(sid: str, data: dict[str, Any] = {}) -> None:
async with sio.session(sid) as session:
room = session["room"]
is_admin = session["admin"]
@ -324,13 +325,13 @@ async def handle_skip(sid, data={}):
@sio.on("disconnect")
async def handle_disconnect(sid, data={}):
async def handle_disconnect(sid: str, data: dict[str, Any] = {}) -> None:
async with sio.session(sid) as session:
sio.leave_room(sid, session["room"])
@sio.on("search")
async def handle_search(sid, data: dict[str, str]):
async def handle_search(sid: str, data: dict[str, str]) -> None:
async with sio.session(sid) as session:
room = session["room"]
state = clients[room]
@ -348,7 +349,11 @@ async def handle_search(sid, data: dict[str, str]):
for result_future in result_futures
for search_result in await result_future
]
await sio.emit("search-results", [result.to_dict() for result in results], room=sid)
await sio.emit(
"search-results",
{"results": [result.to_dict() for result in results]},
room=sid,
)
def main() -> None:

View file

@ -1,9 +1,11 @@
from .source import Source, available_sources
from typing import Any
from .source import Source as Source, available_sources as available_sources
from .youtube import YoutubeSource
from .s3 import S3Source
def configure_sources(configs: dict) -> dict[str, Source]:
def configure_sources(configs: dict[str, Any]) -> dict[str, Source]:
configured_sources = {}
for source, config in configs.items():
if source in available_sources:

View file

@ -3,7 +3,7 @@ from time import sleep, perf_counter
from itertools import zip_longest
import asyncio
import os
from typing import Tuple, Optional
from typing import Tuple, Optional, Any
from minio import Minio
@ -15,7 +15,7 @@ from ..entry import Entry
class S3Source(Source):
def __init__(self, config):
def __init__(self, config: dict[str, Any]):
super().__init__(config)
if "endpoint" in config and "access_key" in config and "secret_key" in config:
@ -46,8 +46,8 @@ class S3Source(Source):
)
raise RuntimeError(f"Could not parse {ident}")
async def get_config(self) -> dict | list[dict]:
def _get_config() -> dict | list[dict]:
async def get_config(self) -> dict[str, Any] | list[dict[str, Any]]:
def _get_config() -> dict[str, Any] | list[dict[str, Any]]:
if not self.index:
print(f"Indexing {self.bucket}")
# self.index = [
@ -66,10 +66,12 @@ class S3Source(Source):
return await asyncio.to_thread(_get_config)
def add_to_config(self, config: dict) -> None:
def add_to_config(self, config: dict[str, Any]) -> None:
self.index += config["index"]
async def search(self, result_future: asyncio.Future, query: str) -> None:
async def search(
self, result_future: asyncio.Future[list[Result]], query: str
) -> None:
print("searching s3")
filtered: list[str] = self.filter_data_by_query(query, self.index)
results: list[Result] = []
@ -80,7 +82,7 @@ class S3Source(Source):
results.append(result)
result_future.set_result(results)
async def get_missing_metadata(self, entry: Entry) -> dict:
async def get_missing_metadata(self, entry: Entry) -> dict[str, Any]:
def mutagen_wrapped(file: str) -> int:
meta_infos = mutagen.File(file).info
return int(meta_infos.length)
@ -107,12 +109,12 @@ class S3Source(Source):
target_file_mp3: str = target_file_cdg[:-3] + "mp3"
os.makedirs(os.path.dirname(target_file_cdg), exist_ok=True)
video_task: asyncio.Task = asyncio.create_task(
video_task: asyncio.Task[None] = asyncio.create_task(
asyncio.to_thread(
self.minio.fget_object, self.bucket, entry.id, target_file_cdg
)
)
audio_task: asyncio.Task = asyncio.create_task(
audio_task: asyncio.Task[None] = asyncio.create_task(
asyncio.to_thread(
self.minio.fget_object, self.bucket, ident_mp3, target_file_mp3
)

View file

@ -7,6 +7,7 @@ from collections import defaultdict
from dataclasses import dataclass, field
from ..entry import Entry
from ..result import Result
@dataclass
@ -30,7 +31,7 @@ class Source:
@staticmethod
async def play_mpv(
video: str, audio: str | None, /, *options
video: str, audio: str | None, /, *options: str
) -> asyncio.subprocess.Process:
args = ["--fullscreen", *options, video] + (
[f"--audio-file={audio}"] if audio else []
@ -42,13 +43,15 @@ class Source:
async def get_entry(self, performer: str, ident: str) -> Entry:
raise NotImplementedError
async def search(self, result_future: asyncio.Future, query: str) -> None:
async def search(
self, result_future: asyncio.Future[list[Result]], query: str
) -> None:
raise NotImplementedError
async def doBuffer(self, entry: Entry) -> Tuple[str, Optional[str]]:
raise NotImplementedError
async def buffer(self, entry: Entry):
async def buffer(self, entry: Entry) -> None:
async with self.masterlock:
if self.downloaded_files[entry.id].buffering:
print(f"already buffering {entry.title}")
@ -75,7 +78,7 @@ class Source:
if self.player is not None:
self.player.kill()
async def ensure_playable(self, entry: Entry):
async def ensure_playable(self, entry: Entry) -> None:
await self.buffer(entry)
await self.downloaded_files[entry.id].ready.wait()

View file

@ -21,7 +21,7 @@ class YoutubeSource(Source):
config["start_streaming"] if "start_streaming" in config else False
)
async def get_config(self) -> dict | list[dict]:
async def get_config(self) -> dict[str, Any] | list[dict[str, Any]]:
return {"channels": self.channels}
async def play(self, entry: Entry) -> None:
@ -63,8 +63,10 @@ class YoutubeSource(Source):
return 1 - (hits / len(queries))
async def search(self, result_future: asyncio.Future, query: str) -> None:
def _search(result_future: asyncio.Future, query: str) -> None:
async def search(
self, result_future: asyncio.Future[list[Result]], query: str
) -> None:
def _search(result_future: asyncio.Future[list[Result]], query: str) -> None:
results: list[YouTube] = []
for channel in self.channels:
results += self._channel_search(query, channel)
@ -99,12 +101,14 @@ class YoutubeSource(Source):
"params": "EgZzZWFyY2g%3D",
}
data.update(self.innertube_client.base_data)
results: dict = self.innertube_client._call_api(
results: dict[str, Any] = self.innertube_client._call_api(
endpoint, self.innertube_client.base_params, data
)
items: list = results["contents"]["twoColumnBrowseResultsRenderer"]["tabs"][-1][
"expandableTabRenderer"
]["content"]["sectionListRenderer"]["contents"]
items: list[dict[str, Any]] = results["contents"][
"twoColumnBrowseResultsRenderer"
]["tabs"][-1]["expandableTabRenderer"]["content"]["sectionListRenderer"][
"contents"
]
list_of_videos: list[YouTube] = []
for item in items:

View file

@ -12,15 +12,15 @@ state: dict[str, Any] = {}
@sio.on("search-results")
async def handle_search_results(data):
for raw_item in data:
async def handle_search_results(data: dict[str, Any]) -> None:
for raw_item in data["results"]:
item = Result(**raw_item)
print(f"{item.artist} - {item.title} [{item.album}]")
print(f"{item.source}: {item.id}")
@sio.on("state")
async def handle_state(data):
async def handle_state(data: dict[str, Any]) -> None:
print("New Queue")
for raw_item in data["queue"]:
item = Entry(**raw_item)
@ -32,13 +32,13 @@ async def handle_state(data):
@sio.on("connect")
async def handle_connect():
async def handle_connect(_: dict[str, Any]) -> None:
print("Connected")
await sio.emit("register-web", {"room": state["room"]})
@sio.on("register-admin")
async def handle_register_admin(data):
async def handle_register_admin(data: dict[str, Any]) -> None:
if data["success"]:
print("Logged in")
else:
@ -48,10 +48,10 @@ async def handle_register_admin(data):
class SyngShell(aiocmd.PromptToolkitCmd):
prompt = "syng> "
def do_exit(self):
def do_exit(self) -> bool:
return True
async def do_stuff(self):
async def do_stuff(self) -> None:
await sio.emit(
"append",
{
@ -61,27 +61,27 @@ class SyngShell(aiocmd.PromptToolkitCmd):
},
)
async def do_search(self, query):
async def do_search(self, query: str) -> None:
await sio.emit("search", {"query": query})
async def do_append(self, source, ident):
async def do_append(self, source: str, ident: str) -> None:
await sio.emit("append", {"performer": "Hammy", "source": source, "id": ident})
async def do_admin(self, data):
async def do_admin(self, data: str) -> None:
await sio.emit("register-admin", {"secret": data})
async def do_connect(self, server, room):
async def do_connect(self, server: str, room: str) -> None:
state["room"] = room
await sio.connect(server)
async def do_skip(self):
async def do_skip(self) -> None:
await sio.emit("skip")
async def do_queue(self):
async def do_queue(self) -> None:
await sio.emit("get-state")
def main():
def main() -> None:
asyncio.run(SyngShell().run())