Some typings, to improve compatibility with pyright

This commit is contained in:
Christoph Stahl 2024-06-18 02:23:19 +02:00
parent 725beab857
commit 60b0fd42c2
13 changed files with 102 additions and 46 deletions

View file

@ -1,12 +1,9 @@
image: python:3-alpine image: python:3-alpine
variables:
MYPYPATH: "stubs/"
mypy: mypy:
stage: test stage: test
script: script:
- pip install mypy types-Pillow types-PyYAML --quiet - pip install .[dev,client] --quiet
- mypy syng --strict - mypy syng --strict
ruff: ruff:

View file

@ -31,15 +31,15 @@ tkcalendar = { version = "^1.6.1", optional = true }
tktimepicker = { version = "^2.0.2", optional = true } tktimepicker = { version = "^2.0.2", optional = true }
platformdirs = { version = "^4.0.0", optional = true } platformdirs = { version = "^4.0.0", optional = true }
packaging = {version = "^23.2", optional = true} packaging = {version = "^23.2", optional = true}
types-pyyaml = {version = "^6.0.12.12", optional = true}
[tool.poetry.group.dev.dependencies] types-pillow = {version = "^10.1.0.2", optional = true}
types-pyyaml = "^6.0.12.12" mypy = {version = "^1.10.0", optional = true}
types-pillow = "^10.1.0.2"
[tool.poetry.extras] [tool.poetry.extras]
client = ["minio", "mutagen", "pillow", "customtkinter", "qrcode", client = ["minio", "mutagen", "pillow", "customtkinter", "qrcode",
"pymediainfo", "pyyaml", "tkcalendar", "tktimepicker", "platformdirs", "pymediainfo", "pyyaml", "tkcalendar", "tktimepicker", "platformdirs",
"packaging"] "packaging"]
dev = ["types-pillow", "types-pillow", "mypy"]
[build-system] [build-system]
requires = ["poetry-core"] requires = ["poetry-core"]
@ -55,6 +55,9 @@ disable = '''too-many-lines,
too-many-ancestors too-many-ancestors
''' '''
[tool.mypy]
mypy_path = "typings"
[[tool.mypy.overrides]] [[tool.mypy.overrides]]
module = [ module = [
"yt_dlp", "yt_dlp",

View file

@ -31,6 +31,7 @@ The config file should be a yaml file in the following style::
waiting_room_policy: .. waiting_room_policy: ..
""" """
import asyncio import asyncio
import datetime import datetime
import logging import logging
@ -46,7 +47,7 @@ from traceback import print_exc
from typing import Any, Optional from typing import Any, Optional
import platformdirs import platformdirs
import qrcode from qrcode.main import QRCode
import socketio import socketio
import engineio import engineio
@ -323,7 +324,7 @@ async def handle_client_registered(data: dict[str, Any]) -> None:
if data["success"]: if data["success"]:
logging.info("Registered") logging.info("Registered")
print(f"Join here: {state.config['server']}/{data['room']}") print(f"Join here: {state.config['server']}/{data['room']}")
qr = qrcode.QRCode(box_size=20, border=2) qr = QRCode(box_size=20, border=2)
qr.add_data(f"{state.config['server']}/{data['room']}") qr.add_data(f"{state.config['server']}/{data['room']}")
qr.make() qr.make()
qr.print_ascii() qr.print_ascii()

View file

@ -4,7 +4,7 @@ from datetime import datetime, date, time
import os import os
import builtins import builtins
from functools import partial from functools import partial
from typing import Any, Optional from typing import TYPE_CHECKING, Any, Optional
import webbrowser import webbrowser
import multiprocessing import multiprocessing
import secrets import secrets
@ -13,7 +13,7 @@ import string
from PIL import ImageTk from PIL import ImageTk
from yaml import dump, load, Loader, Dumper from yaml import dump, load, Loader, Dumper
import customtkinter import customtkinter
from qrcode import QRCode from qrcode.main import QRCode
from tkcalendar import Calendar from tkcalendar import Calendar
from tktimepicker import AnalogPicker, AnalogThemes, constants from tktimepicker import AnalogPicker, AnalogThemes, constants
import platformdirs import platformdirs
@ -27,6 +27,9 @@ try:
SERVER_AVAILABLE = True SERVER_AVAILABLE = True
except ImportError: except ImportError:
if TYPE_CHECKING:
from .server import main as server_main
SERVER_AVAILABLE = False SERVER_AVAILABLE = False

View file

@ -12,6 +12,7 @@ Starts a async socketio server, and serves the web client::
--root-folder PATH, -r PATH --root-folder PATH, -r PATH
""" """
from __future__ import annotations from __future__ import annotations
import asyncio import asyncio
@ -135,7 +136,9 @@ class State:
recent: list[Entry] recent: list[Entry]
sid: str sid: str
client: Client client: Client
last_seen: datetime.datetime = field(init=False, default_factory=datetime.datetime.now) last_seen: datetime.datetime = field(
init=False, default_factory=datetime.datetime.now
)
clients: dict[str, State] = {} clients: dict[str, State] = {}
@ -158,7 +161,9 @@ async def send_state(state: State, sid: str) -> None:
:rtype: None :rtype: None
""" """
safe_config = {k: v for k, v in state.client.config.items() if k not in ["secret", "key"]} safe_config = {
k: v for k, v in state.client.config.items() if k not in ["secret", "key"]
}
await sio.emit( await sio.emit(
"state", "state",
@ -209,13 +214,18 @@ async def handle_waiting_room_append(sid: str, data: dict[str, Any]) -> None:
if entry is None: if entry is None:
await sio.emit( await sio.emit(
"msg", "msg",
{"msg": f"Unable to add to the waiting room: {data['ident']}. Maybe try again?"}, {
"msg": f"Unable to add to the waiting room: {data['ident']}. Maybe try again?"
},
room=sid, room=sid,
) )
return return
if "uid" not in data or ( if "uid" not in data or (
(data["uid"] is not None and len(list(state.queue.find_by_uid(data["uid"]))) == 0) (
data["uid"] is not None
and len(list(state.queue.find_by_uid(data["uid"]))) == 0
)
or (data["uid"] is None and state.queue.find_by_name(data["performer"]) is None) or (data["uid"] is None and state.queue.find_by_name(data["performer"]) is None)
): ):
await append_to_queue(room, entry, sid) await append_to_queue(room, entry, sid)
@ -232,7 +242,9 @@ async def handle_waiting_room_append(sid: str, data: dict[str, Any]) -> None:
) )
async def append_to_queue(room: str, entry: Entry, report_to: Optional[str] = None) -> None: async def append_to_queue(
room: str, entry: Entry, report_to: Optional[str] = None
) -> None:
""" """
Append a song to the queue for a given session. Append a song to the queue for a given session.
@ -256,7 +268,10 @@ async def append_to_queue(room: str, entry: Entry, report_to: Optional[str] = No
start_time = first_song.started_at start_time = first_song.started_at
start_time = state.queue.fold( start_time = state.queue.fold(
lambda item, time: time + item.duration + state.client.config["preview_duration"] + 1, lambda item, time: time
+ item.duration
+ state.client.config["preview_duration"]
+ 1,
start_time, start_time,
) )
@ -541,7 +556,11 @@ async def handle_waiting_room_to_queue(sid: str, data: dict[str, Any]) -> None:
if is_admin: if is_admin:
entry = next( entry = next(
(wr_entry for wr_entry in state.waiting_room if str(wr_entry.uuid) == data["uuid"]), (
wr_entry
for wr_entry in state.waiting_room
if str(wr_entry.uuid) == data["uuid"]
),
None, None,
) )
if entry is not None: if entry is not None:
@ -673,7 +692,9 @@ async def handle_register_client(sid: str, data: dict[str, Any]) -> None:
""" """
def gen_id(length: int = 4) -> str: def gen_id(length: int = 4) -> str:
client_id = "".join([random.choice(string.ascii_letters) for _ in range(length)]) client_id = "".join(
[random.choice(string.ascii_letters) for _ in range(length)]
)
if client_id in clients: if client_id in clients:
client_id = gen_id(length + 1) client_id = gen_id(length + 1)
return client_id return client_id
@ -685,7 +706,8 @@ async def handle_register_client(sid: str, data: dict[str, Any]) -> None:
if ( if (
"key" not in data["config"] "key" not in data["config"]
or hashlib.sha256(data["config"]["key"].encode()).hexdigest() not in keys or hashlib.sha256(data["config"]["key"].encode()).hexdigest()
not in keys
): ):
await sio.emit( await sio.emit(
"client-registered", "client-registered",
@ -695,7 +717,9 @@ async def handle_register_client(sid: str, data: dict[str, Any]) -> None:
return return
room: str = ( room: str = (
data["config"]["room"] if "room" in data["config"] and data["config"]["room"] else gen_id() data["config"]["room"]
if "room" in data["config"] and data["config"]["room"]
else gen_id()
) )
async with sio.session(sid) as session: async with sio.session(sid) as session:
session["room"] = room session["room"] = room
@ -711,11 +735,15 @@ async def handle_register_client(sid: str, data: dict[str, Any]) -> None:
config=DEFAULT_CONFIG | data["config"], config=DEFAULT_CONFIG | data["config"],
) )
await sio.enter_room(sid, room) await sio.enter_room(sid, room)
await sio.emit("client-registered", {"success": True, "room": room}, room=sid) await sio.emit(
"client-registered", {"success": True, "room": room}, room=sid
)
await send_state(clients[room], sid) await send_state(clients[room], sid)
else: else:
logger.warning("Got wrong secret for %s", room) logger.warning("Got wrong secret for %s", room)
await sio.emit("client-registered", {"success": False, "room": room}, room=sid) await sio.emit(
"client-registered", {"success": False, "room": room}, room=sid
)
else: else:
logger.info("Registerd new client %s", room) logger.info("Registerd new client %s", room)
initial_entries = [Entry(**entry) for entry in data["queue"]] initial_entries = [Entry(**entry) for entry in data["queue"]]
@ -806,7 +834,9 @@ async def handle_config_chunk(sid: str, data: dict[str, Any]) -> None:
return return
if data["source"] not in state.client.sources: if data["source"] not in state.client.sources:
state.client.sources[data["source"]] = available_sources[data["source"]](data["config"]) state.client.sources[data["source"]] = available_sources[data["source"]](
data["config"]
)
else: else:
state.client.sources[data["source"]].add_to_config(data["config"]) state.client.sources[data["source"]].add_to_config(data["config"])
@ -835,7 +865,9 @@ async def handle_config(sid: str, data: dict[str, Any]) -> None:
if sid != state.sid: if sid != state.sid:
return return
state.client.sources[data["source"]] = available_sources[data["source"]](data["config"]) state.client.sources[data["source"]] = available_sources[data["source"]](
data["config"]
)
@sio.on("register-web") @sio.on("register-web")
@ -1020,10 +1052,17 @@ async def handle_search(sid: str, data: dict[str, Any]) -> None:
query = data["query"] query = data["query"]
results_list = await asyncio.gather( results_list = await asyncio.gather(
*[state.client.sources[source].search(query) for source in state.client.sources_prio] *[
state.client.sources[source].search(query)
for source in state.client.sources_prio
]
) )
results = [search_result for source_result in results_list for search_result in source_result] results = [
search_result
for source_result in results_list
for search_result in source_result
]
await sio.emit( await sio.emit(
"search-results", "search-results",
{"results": results}, {"results": results},
@ -1104,7 +1143,9 @@ def main() -> None:
app["root_folder"] = args.root_folder app["root_folder"] = args.root_folder
app.add_routes([web.static("/assets/", os.path.join(app["root_folder"], "assets/"))]) app.add_routes(
[web.static("/assets/", os.path.join(app["root_folder"], "assets/"))]
)
app.router.add_route("*", "/", root_handler) app.router.add_route("*", "/", root_handler)
app.router.add_route("*", "/{room}", root_handler) app.router.add_route("*", "/{room}", root_handler)

View file

@ -1,13 +1,16 @@
"""Module for an abstract filebased Source.""" """Module for an abstract filebased Source."""
import asyncio import asyncio
import os import os
from typing import Any, Optional from typing import TYPE_CHECKING, Any, Optional
try: try:
from pymediainfo import MediaInfo from pymediainfo import MediaInfo
PYMEDIAINFO_AVAILABLE = True PYMEDIAINFO_AVAILABLE = True
except ImportError: except ImportError:
if TYPE_CHECKING:
from pymediainfo import MediaInfo
PYMEDIAINFO_AVAILABLE = False PYMEDIAINFO_AVAILABLE = False
from .source import Source from .source import Source
@ -35,7 +38,7 @@ class FileBasedSource(Source):
self.extensions: list[str] = config["extensions"] if "extensions" in config else ["mp3+cdg"] self.extensions: list[str] = config["extensions"] if "extensions" in config else ["mp3+cdg"]
self.extra_mpv_arguments = ["--scale=oversample"] self.extra_mpv_arguments = ["--scale=oversample"]
def has_correct_extension(self, path: str) -> bool: def has_correct_extension(self, path: Optional[str]) -> bool:
"""Check if a `path` has a correct extension. """Check if a `path` has a correct extension.
For A+B type extensions (like mp3+cdg) only the latter halve is checked For A+B type extensions (like mp3+cdg) only the latter halve is checked
@ -43,7 +46,9 @@ class FileBasedSource(Source):
:return: True iff path has correct extension. :return: True iff path has correct extension.
:rtype: bool :rtype: bool
""" """
return os.path.splitext(path)[1][1:] in [ext.split("+")[-1] for ext in self.extensions] return path is not None and os.path.splitext(path)[1][1:] in [
ext.split("+")[-1] for ext in self.extensions
]
def get_video_audio_split(self, path: str) -> tuple[str, Optional[str]]: def get_video_audio_split(self, path: str) -> tuple[str, Optional[str]]:
extension_of_path = os.path.splitext(path)[1][1:] extension_of_path = os.path.splitext(path)[1][1:]

View file

@ -7,13 +7,15 @@ Adds it to the ``available_sources`` with the name ``s3``
import asyncio import asyncio
import os import os
from json import dump, load from json import dump, load
from typing import Any, Optional, Tuple, cast from typing import TYPE_CHECKING, Any, Optional, Tuple, cast
try: try:
from minio import Minio from minio import Minio
MINIO_AVAILABE = True MINIO_AVAILABE = True
except ImportError: except ImportError:
if TYPE_CHECKING:
from minio import Minio
MINIO_AVAILABE = False MINIO_AVAILABE = False
from ..entry import Entry from ..entry import Entry
@ -87,8 +89,8 @@ class S3Source(FileBasedSource):
file_list = [ file_list = [
obj.object_name obj.object_name
for obj in self.minio.list_objects(self.bucket, recursive=True) for obj in self.minio.list_objects(self.bucket, recursive=True)
if self.has_correct_extension(obj.object_name) if obj.object_name is not None
] ] # and self.has_correct_extension(obj.object_name)
if self.index_file is not None and not os.path.isfile(self.index_file): if self.index_file is not None and not os.path.isfile(self.index_file):
with open(self.index_file, "w", encoding="utf8") as index_file_handle: with open(self.index_file, "w", encoding="utf8") as index_file_handle:
dump(file_list, index_file_handle) dump(file_list, index_file_handle)

View file

@ -299,7 +299,7 @@ class YoutubeSource(Source):
:return: The location of the video file and ``None``. :return: The location of the video file and ``None``.
:rtype: Tuple[str, Optional[str]] :rtype: Tuple[str, Optional[str]]
""" """
info = await asyncio.to_thread(self._yt_dlp.extract_info, entry.ident) info: Any = await asyncio.to_thread(self._yt_dlp.extract_info, entry.ident)
combined_path = info["requested_downloads"][0]["filepath"] combined_path = info["requested_downloads"][0]["filepath"]
return combined_path, None return combined_path, None

8
typings/qrcode/main.pyi Normal file
View file

@ -0,0 +1,8 @@
from PIL import Image
class QRCode:
def __init__(self, box_size: int, border: int) -> None: ...
def add_data(self, string: str) -> None: ...
def make(self) -> None: ...
def print_ascii(self) -> None: ...
def make_image(self) -> Image.Image: ...

View file

@ -1,15 +1,11 @@
from typing import Any from typing import Any, Awaitable
from typing import Callable from typing import Callable
from typing import Optional from typing import Optional
from typing import TypeVar from typing import TypeVar, TypeAlias
Handler = TypeVar( Handler: TypeAlias = Callable[[str], Awaitable[Any]]
"Handler", DictHandler: TypeAlias = Callable[[str, dict[str, Any]], Awaitable[Any]]
bound=Callable[[str, dict[str, Any]], Any] | Callable[[str], Any], ClientHandler = TypeVar("ClientHandler", bound=Callable[[dict[str, Any]], Any] | Callable[[], Any])
)
ClientHandler = TypeVar(
"ClientHandler", bound=Callable[[dict[str, Any]], Any] | Callable[[], Any]
)
class _session_context_manager: class _session_context_manager:
async def __aenter__(self) -> dict[str, Any]: ... async def __aenter__(self) -> dict[str, Any]: ...
@ -30,7 +26,7 @@ class AsyncServer:
room: Optional[str] = None, room: Optional[str] = None,
) -> None: ... ) -> None: ...
def session(self, sid: str) -> _session_context_manager: ... def session(self, sid: str) -> _session_context_manager: ...
def on(self, event: str) -> Callable[[Handler], Handler]: ... def on(self, event: str) -> Callable[[Handler | DictHandler], Handler | DictHandler]: ...
async def enter_room(self, sid: str, room: str) -> None: ... async def enter_room(self, sid: str, room: str) -> None: ...
async def leave_room(self, sid: str, room: str) -> None: ... async def leave_room(self, sid: str, room: str) -> None: ...
def attach(self, app: Any) -> None: ... def attach(self, app: Any) -> None: ...