Implemented restricted mode and client side search querying.

Also lots of documentation
This commit is contained in:
Christoph Stahl 2024-09-18 23:59:29 +02:00
parent bcb8843b35
commit da9ef35ba4
9 changed files with 445 additions and 73 deletions

View file

@ -29,6 +29,7 @@ The config file should be a yaml file in the following style::
secret: ... secret: ...
last_song: ... last_song: ...
waiting_room_policy: .. waiting_room_policy: ..
key: ..
""" """
@ -69,6 +70,12 @@ currentLock: asyncio.Semaphore = asyncio.Semaphore(0)
def default_config() -> dict[str, Optional[int | str]]: def default_config() -> dict[str, Optional[int | str]]:
"""
Return a default configuration for the client.
:returns: A dictionary with the default configuration.
:rtype: dict[str, Optional[int | str]]
"""
return { return {
"server": "http://localhost:8080", "server": "http://localhost:8080",
"room": "ABCD", "room": "ABCD",
@ -76,6 +83,7 @@ def default_config() -> dict[str, Optional[int | str]]:
"secret": None, "secret": None,
"last_song": None, "last_song": None,
"waiting_room_policy": None, "waiting_room_policy": None,
"key": None,
} }
@ -102,7 +110,8 @@ class State:
* `secret` (`str`): The passcode of the room. If a playback client reconnects to * `secret` (`str`): The passcode of the room. If a playback client reconnects to
a room, this must be identical. Also, if a webclient wants to have a room, this must be identical. Also, if a webclient wants to have
admin privileges, this must be included. admin privileges, this must be included.
* `key` (`Optional[str]`) An optional key, if registration on the server is limited. * `key` (`Optional[str]`) An optional key, if registration or functionality on the server
is limited.
* `preview_duration` (`Optional[int]`): The duration in seconds the * `preview_duration` (`Optional[int]`): The duration in seconds the
playback client shows a preview for the next song. This is accounted for playback client shows a preview for the next song. This is accounted for
in the calculation of the ETA for songs later in the queue. in the calculation of the ETA for songs later in the queue.
@ -131,6 +140,15 @@ state: State = State()
@sio.on("update_config") @sio.on("update_config")
async def handle_update_config(data: dict[str, Any]) -> None: async def handle_update_config(data: dict[str, Any]) -> None:
"""
Handle the "update_config" message.
Currently, this function is untested and should be considered dangerous.
:param data: A dictionary with the new configuration.
:type data: dict[str, Any]
:rtype: None
"""
state.config = default_config() | data state.config = default_config() | data
@ -300,6 +318,32 @@ async def handle_play(data: dict[str, Any]) -> None:
await sio.emit("pop-then-get-next") await sio.emit("pop-then-get-next")
@sio.on("search")
async def handle_search(data: dict[str, Any]) -> None:
"""
Handle the "search" message.
This handles client side search requests. It sends a search request to all
configured :py:class:`syng.sources.source.Source` and collects the results.
The results are then send back to the server in a "search-results" message,
including the `sid` of the corresponding webclient.
:param data: A dictionary with the `query` and `sid` entry.
:type data: dict[str, Any]
:rtype: None
"""
query = data["query"]
sid = data["sid"]
results_list = await asyncio.gather(*[source.search(query) for source in sources.values()])
results = [
search_result.to_dict() for source_result in results_list for search_result in source_result
]
await sio.emit("search-results", {"results": results, "sid": sid})
@sio.on("client-registered") @sio.on("client-registered")
async def handle_client_registered(data: dict[str, Any]) -> None: async def handle_client_registered(data: dict[str, Any]) -> None:
""" """
@ -374,6 +418,14 @@ async def handle_request_config(data: dict[str, Any]) -> None:
def signal_handler() -> None: def signal_handler() -> None:
"""
Signal handler for the client.
This function is called when the client receives a signal to terminate. It
will disconnect from the server and kill the current player.
:rtype: None
"""
engineio.async_client.async_signal_handler() engineio.async_client.async_signal_handler()
if state.current_source is not None: if state.current_source is not None:
if state.current_source.player is not None: if state.current_source.player is not None:
@ -424,10 +476,31 @@ async def start_client(config: dict[str, Any]) -> None:
def create_async_and_start_client(config: dict[str, Any]) -> None: def create_async_and_start_client(config: dict[str, Any]) -> None:
"""
Create an asyncio event loop and start the client.
:param config: Config options for the client
:type config: dict[str, Any]
:rtype: None
"""
asyncio.run(start_client(config)) asyncio.run(start_client(config))
def run_client(args: Namespace) -> None: def run_client(args: Namespace) -> None:
"""
Run the client with the given arguments.
Namespace contains the following attributes:
- room: The room code to connect to
- secret: The secret to connect to the room
- config_file: The path to the configuration file
- key: The key to connect to the server
- server: The url of the server to connect to
:param args: The arguments from the command line
:type args: Namespace
:rtype: None
"""
try: try:
with open(args.config_file, encoding="utf8") as file: with open(args.config_file, encoding="utf8") as file:
config = load(file, Loader=Loader) config = load(file, Loader=Loader)
@ -452,7 +525,8 @@ def main() -> None:
"""Entry point for the syng-client script.""" """Entry point for the syng-client script."""
print( print(
f"Starting the client with {argv[0]} is deprecated. Please use `syng client` to start the client", f"Starting the client with {argv[0]} is deprecated. "
"Please use `syng client` to start the client",
file=stderr, file=stderr,
) )
parser: ArgumentParser = ArgumentParser() parser: ArgumentParser = ArgumentParser()

View file

@ -73,6 +73,15 @@ class Entry:
self.__dict__.update(kwargs) self.__dict__.update(kwargs)
def shares_performer(self, other_performer: str) -> bool: def shares_performer(self, other_performer: str) -> bool:
"""
Check if this entry shares a performer with another entry.
:param other_performer: The performer to check against.
:type other_performer: str
:return: True if the performers intersect, False otherwise.
:rtype: bool
"""
def normalize(performers: str) -> set[str]: def normalize(performers: str) -> set[str]:
return set( return set(
filter( filter(

View file

@ -1,3 +1,20 @@
"""
Main entry point for the application.
This module contains the main entry point for the application. It parses the
command line arguments and runs the appropriate function based on the arguments.
This module also checks if the client and server modules are available and
imports them if they are. If they are not available, the application will not
run the client or server functions.
Client usage: syng client [-h] [--room ROOM] [--secret SECRET] \
[--config-file CONFIG_FILE] [--key KEY] [--server SERVER]
Server usage: syng server [-h] [--host HOST] [--port PORT] [--root-folder ROOT_FOLDER] \
[--registration-keyfile REGISTRATION_KEYFILE] [--private] [--restricted]
GUI usage: syng gui
"""
from typing import TYPE_CHECKING from typing import TYPE_CHECKING
from argparse import ArgumentParser from argparse import ArgumentParser
import os import os
@ -28,6 +45,14 @@ except ImportError:
def main() -> None: def main() -> None:
"""
Main entry point for the application.
This function parses the command line arguments and runs the appropriate
function based on the arguments.
:return: None
"""
parser: ArgumentParser = ArgumentParser() parser: ArgumentParser = ArgumentParser()
sub_parsers = parser.add_subparsers(dest="action") sub_parsers = parser.add_subparsers(dest="action")
@ -53,6 +78,8 @@ def main() -> None:
server_parser.add_argument("--port", "-p", type=int, default=8080) server_parser.add_argument("--port", "-p", type=int, default=8080)
server_parser.add_argument("--root-folder", "-r", default=root_path) server_parser.add_argument("--root-folder", "-r", default=root_path)
server_parser.add_argument("--registration-keyfile", "-k", default=None) server_parser.add_argument("--registration-keyfile", "-k", default=None)
server_parser.add_argument("--private", "-P", action="store_true", default=False)
server_parser.add_argument("--restricted", "-R", action="store_true", default=False)
args = parser.parse_args() args = parser.parse_args()

View file

@ -107,6 +107,14 @@ class Queue:
updater(item) updater(item)
def find_by_name(self, name: str) -> Optional[Entry]: def find_by_name(self, name: str) -> Optional[Entry]:
"""
Find an entry by its performer and return it.
:param name: The name of the performer to search for.
:type name: str
:returns: The entry with the performer or `None` if no such entry exists
:rtype: Optional[Entry]
"""
for item in self._queue: for item in self._queue:
if item.shares_performer(name): if item.shares_performer(name):
return item return item

View file

@ -2,7 +2,6 @@
from __future__ import annotations from __future__ import annotations
from dataclasses import dataclass from dataclasses import dataclass
from typing import Optional
import os.path import os.path
@ -29,19 +28,17 @@ class Result:
artist: str artist: str
album: str album: str
@staticmethod @classmethod
def from_filename(filename: str, source: str) -> Optional[Result]: def from_filename(cls, filename: str, source: str) -> Result:
""" """
Infere most attributes from the filename. Infer most attributes from the filename.
The filename must be in this form:: The filename must be in this form::
{artist} - {title} - {album}.cdg {artist} - {title} - {album}.ext
Although the extension (cdg) is not required If parsing failes, the filename will be used as the title and the
artist and album will be set to "Unknown".
If parsing failes, ``None`` is returned. Otherwise a Result object with
those attributes is created.
:param filename: The filename to parse :param filename: The filename to parse
:type filename: str :type filename: str
@ -50,12 +47,62 @@ class Result:
:return: see above :return: see above
:rtype: Optional[Result] :rtype: Optional[Result]
""" """
basename = os.path.splitext(filename)[0]
try: try:
splitfile = os.path.basename(filename[:-4]).split(" - ") splitfile = os.path.basename(basename).split(" - ")
ident = filename ident = filename
artist = splitfile[0].strip() artist = splitfile[0].strip()
title = splitfile[1].strip() title = splitfile[1].strip()
album = splitfile[2].strip() album = splitfile[2].strip()
return Result(ident, source, title, artist, album) return cls(ident=ident, source=source, title=title, artist=artist, album=album)
except IndexError: except IndexError:
return None return cls(
ident=filename, source=source, title=basename, artist="Unknown", album="Unknown"
)
@classmethod
def from_dict(cls, values: dict[str, str]) -> Result:
"""
Create a Result object from a dictionary.
The dictionary must have the following keys:
- ident (str)
- source (str)
- title (str)
- artist (str)
- album (str)
:param values: The dictionary with the values
:type values: dict[str, str]
:return: The Result object
:rtype: Result
"""
return cls(
ident=values["ident"],
source=values["source"],
title=values["title"],
artist=values["artist"],
album=values["album"],
)
def to_dict(self) -> dict[str, str]:
"""
Convert the Result object to a dictionary.
The dictionary will have the following keys:
- ident (str)
- source (str)
- title (str)
- artist (str)
- album (str)
:return: The dictionary with the values
:rtype: dict[str, str]
"""
return {
"ident": self.ident,
"source": self.source,
"title": self.title,
"artist": self.artist,
"album": self.album,
}

View file

@ -35,6 +35,8 @@ import socketio
from aiohttp import web from aiohttp import web
from profanity_check import predict from profanity_check import predict
from syng.result import Result
from . import jsonencoder from . import jsonencoder
from .entry import Entry from .entry import Entry
from .queue import Queue from .queue import Queue
@ -200,6 +202,16 @@ async def handle_waiting_room_append(sid: str, data: dict[str, Any]) -> None:
""" """
Append a song to the waiting room. Append a song to the waiting room.
This should be called from a web client. Appends the entry, that is encoded
within the data to the waiting room of the room the client is currently
connected to.
:param sid: The session id of the client sending this request
:type sid: str
:param data: A dictionary encoding the entry, that should be added to the
waiting room.
:type data: dict[str, Any]
:rtype: None
""" """
async with sio.session(sid) as session: async with sio.session(sid) as session:
room = session["room"] room = session["room"]
@ -318,6 +330,17 @@ async def handle_show_config(sid: str) -> None:
@sio.on("update_config") @sio.on("update_config")
async def handle_update_config(sid: str, data: dict[str, Any]) -> None: async def handle_update_config(sid: str, data: dict[str, Any]) -> None:
"""
Forwards an updated config from an authorized webclient to the playback client.
This is currently untrested and should be used with caution.
:param sid: The session id of the client sending this request
:type sid: str
:param data: A dictionary encoding the new configuration
:type data: dict[str, Any]
:rtype: None
"""
async with sio.session(sid) as session: async with sio.session(sid) as session:
room = session["room"] room = session["room"]
is_admin = session["admin"] is_admin = session["admin"]
@ -595,6 +618,14 @@ async def add_songs_from_waiting_room(room: str) -> None:
async def discard_first(room: str) -> Entry: async def discard_first(room: str) -> Entry:
""" """
Gets the first element of the queue, handling resulting triggers. Gets the first element of the queue, handling resulting triggers.
This function is used to get the first element of the queue, and handle
the resulting triggers. This includes adding songs from the waiting room,
and updating the state of the room.
:param room: The room to get the first element from.
:type room: str
:rtype: Entry
""" """
state = clients[room] state = clients[room]
@ -644,14 +675,33 @@ async def handle_pop_then_get_next(sid: str) -> None:
await sio.emit("play", current, room=sid) await sio.emit("play", current, room=sid)
def check_registration(key: str) -> bool:
"""
Check if a given key is in the registration keyfile.
This is used to authenticate a client, if the server is in private or
restricted mode.
:param key: The key to check
:type key: str
:return: True if the key is in the registration keyfile, False otherwise
:rtype: bool
"""
with open(app["registration-keyfile"], encoding="utf8") as f:
raw_keys = f.readlines()
keys = [key[:64] for key in raw_keys]
print(keys)
print(key)
return key in keys
@sio.on("register-client") @sio.on("register-client")
async def handle_register_client(sid: str, data: dict[str, Any]) -> None: async def handle_register_client(sid: str, data: dict[str, Any]) -> None:
""" """
Handle the "register-client" message. Handle the "register-client" message.
The data dictionary should have the following keys: The data dictionary should have the following keys:
- `registration_key` (Optional), a key corresponding to those stored
in `app["registration-keyfile"]`
- `room` (Optional), the requested room - `room` (Optional), the requested room
- `config`, an dictionary of initial configurations - `config`, an dictionary of initial configurations
- `queue`, a list of initial entries for the queue. The entries are - `queue`, a list of initial entries for the queue. The entries are
@ -659,6 +709,7 @@ async def handle_register_client(sid: str, data: dict[str, Any]) -> None:
- `recent`, a list of initial entries for the recent list. The entries - `recent`, a list of initial entries for the recent list. The entries
are encoded as a dictionary. are encoded as a dictionary.
- `secret`, the secret of the room - `secret`, the secret of the room
- `key`, a registration key given out by the server administrator
This will register a new playback client to a specific room. If there This will register a new playback client to a specific room. If there
already exists a playback client registered for this room, this already exists a playback client registered for this room, this
@ -697,21 +748,19 @@ async def handle_register_client(sid: str, data: dict[str, Any]) -> None:
client_id = gen_id(length + 1) client_id = gen_id(length + 1)
return client_id return client_id
if not app["public"]: if "key" in data["config"]:
with open(app["registration-keyfile"], encoding="utf8") as f: print(data["config"]["key"])
raw_keys = f.readlines() data["config"]["key"] = hashlib.sha256(data["config"]["key"].encode()).hexdigest()
keys = [key[:64] for key in raw_keys]
if ( if app["type"] == "private" and (
"key" not in data["config"] "key" not in data["config"] or not check_registration(data["config"]["key"])
or hashlib.sha256(data["config"]["key"].encode()).hexdigest() not in keys ):
): await sio.emit(
await sio.emit( "client-registered",
"client-registered", {"success": False, "room": None},
{"success": False, "room": None}, room=sid,
room=sid, )
) return
return
room: str = ( room: str = (
data["config"]["room"] if "room" in data["config"] and data["config"]["room"] else gen_id() data["config"]["room"] if "room" in data["config"] and data["config"]["room"] else gen_id()
@ -1038,11 +1087,65 @@ async def handle_search(sid: str, data: dict[str, Any]) -> None:
state = clients[room] state = clients[room]
query = data["query"] query = data["query"]
results_list = await asyncio.gather( if (
*[state.client.sources[source].search(query) for source in state.client.sources_prio] app["type"] != "restricted"
) or "key" in state.client.config
and check_registration(state.client.config["key"])
):
results_list = await asyncio.gather(
*[state.client.sources[source].search(query) for source in state.client.sources_prio]
)
results = [search_result for source_result in results_list for search_result in source_result] results = [
search_result for source_result in results_list for search_result in source_result
]
await send_search_results(sid, results)
else:
print("Denied")
await sio.emit("search", {"query": query, "sid": sid}, room=state.sid)
@sio.on("search-results")
async def handle_search_results(sid: str, data: dict[str, Any]) -> None:
"""
Handle the "search-results" message.
This message is send by the playback client, once it has received search
results. The results are send to the web client.
The data dictionary should have the following keys:
- `sid`, the session id of the web client (str)
- `results`, a list of search results (list[dict[str, Any]])
:param sid: The session id of the playback client
:type sid: str
:param data: A dictionary with the keys described above
:type data: dict[str, Any]
:rtype: None
"""
async with sio.session(sid) as session:
room = session["room"]
state = clients[room]
if sid != state.sid:
return
web_sid = data["sid"]
results = [Result.from_dict(result) for result in data["results"]]
await send_search_results(web_sid, results)
async def send_search_results(sid: str, results: list[Result]) -> None:
"""
Send search results to a client.
:param sid: The session id of the client to send the results to.
:type sid: str
:param results: The search results to send.
:type results: list[Result]
:rtype: None
"""
await sio.emit( await sio.emit(
"search-results", "search-results",
{"results": results}, {"results": results},
@ -1051,9 +1154,12 @@ async def handle_search(sid: str, data: dict[str, Any]) -> None:
async def cleanup() -> None: async def cleanup() -> None:
"""Clean up the unused playback clients """
Clean up the unused playback clients
This runs every hour, and removes every client, that did not requested a song for four hours This runs every hour, and removes every client, that did not requested a song for four hours.
:rtype: None
""" """
logger.info("Start Cleanup") logger.info("Start Cleanup")
@ -1085,9 +1191,14 @@ async def cleanup() -> None:
async def background_tasks( async def background_tasks(
iapp: web.Application, iapp: web.Application,
) -> AsyncGenerator[None, None]: ) -> AsyncGenerator[None, None]:
"""Create all the background tasks """
Create all the background tasks.
For now, this is only the cleanup task For now, this is only the cleanup task.
:param iapp: The web application
:type iapp: web.Application
:rtype: AsyncGenerator[None, None]
""" """
iapp["repeated_cleanup"] = asyncio.create_task(cleanup()) iapp["repeated_cleanup"] = asyncio.create_task(cleanup())
@ -1099,9 +1210,23 @@ async def background_tasks(
def run_server(args: Namespace) -> None: def run_server(args: Namespace) -> None:
app["public"] = True """
Run the server.
`args` consists of the following attributes:
- `host`, the host to bind to
- `port`, the port to bind to
- `root_folder`, the root folder of the web client
- `registration_keyfile`, the file containing the registration keys
- `private`, if the server is private
- `restricted`, if the server is restricted
:param args: The command line arguments
:type args: Namespace
:rtype: None
"""
app["type"] = "private" if args.private else "restricted" if args.restricted else "public"
if args.registration_keyfile: if args.registration_keyfile:
app["public"] = False
app["registration-keyfile"] = args.registration_keyfile app["registration-keyfile"] = args.registration_keyfile
app["root_folder"] = args.root_folder app["root_folder"] = args.root_folder
@ -1128,7 +1253,8 @@ def main() -> None:
""" """
print( print(
f"Starting the server with {argv[0]} is deprecated. Please use `syng server` to start the server", f"Starting the server with {argv[0]} is deprecated. "
"Please use `syng server` to start the server",
file=stderr, file=stderr,
) )
@ -1138,6 +1264,8 @@ def main() -> None:
parser.add_argument("--port", "-p", type=int, default=8080) parser.add_argument("--port", "-p", type=int, default=8080)
parser.add_argument("--root-folder", "-r", default=root_path) parser.add_argument("--root-folder", "-r", default=root_path)
parser.add_argument("--registration-keyfile", "-k", default=None) parser.add_argument("--registration-keyfile", "-k", default=None)
parser.add_argument("--private", "-P", action="store_true", default=False)
parser.add_argument("--restricted", "-R", action="store_true", default=False)
args = parser.parse_args() args = parser.parse_args()
run_server(args) run_server(args)

View file

@ -17,10 +17,11 @@ from .source import Source
class FileBasedSource(Source): class FileBasedSource(Source):
"""A source for indexing and playing songs from a local folder. """
A abstract source for indexing and playing songs based on files.
Config options are: Config options are:
-``dir``, dirctory to index and server from. -``extensions``, list of filename extensions
""" """
config_schema = Source.config_schema | { config_schema = Source.config_schema | {
@ -39,18 +40,31 @@ class FileBasedSource(Source):
self.extra_mpv_arguments = ["--scale=oversample"] self.extra_mpv_arguments = ["--scale=oversample"]
def has_correct_extension(self, path: Optional[str]) -> bool: def has_correct_extension(self, path: Optional[str]) -> bool:
"""Check if a `path` has a correct extension. """
Check if a `path` has a correct extension.
For A+B type extensions (like mp3+cdg) only the latter halve is checked For A+B type extensions (like mp3+cdg) only the latter halve is checked
:param path: The path to check.
:type path: Optional[str]
:return: True iff path has correct extension. :return: True iff path has correct extension.
:rtype: bool :rtype: bool
""" """
return path is not None and os.path.splitext(path)[1][1:] in [ return path is not None and os.path.splitext(path)[1][1:] in [
ext.split("+")[-1] for ext in self.extensions ext.rsplit("+", maxsplit=1)[-1] for ext in self.extensions
] ]
def get_video_audio_split(self, path: str) -> tuple[str, Optional[str]]: def get_video_audio_split(self, path: str) -> tuple[str, Optional[str]]:
"""
Returns path for audio and video file, if filetype is marked as split.
If the file is not marked as split, the second element of the tuple will be None.
:params: path: The path to the file
:type path: str
:return: Tuple with path to video and audio file
:rtype: tuple[str, Optional[str]]
"""
extension_of_path = os.path.splitext(path)[1][1:] extension_of_path = os.path.splitext(path)[1][1:]
splitted_extensions = [ext.split("+") for ext in self.extensions if "+" in ext] splitted_extensions = [ext.split("+") for ext in self.extensions if "+" in ext]
splitted_extensions_dict = {video: audio for [audio, video] in splitted_extensions} splitted_extensions_dict = {video: audio for [audio, video] in splitted_extensions}
@ -63,6 +77,14 @@ class FileBasedSource(Source):
return (path, None) return (path, None)
async def get_duration(self, path: str) -> int: async def get_duration(self, path: str) -> int:
"""
Return the duration for the file.
:param path: The path to the file
:type path: str
:return: The duration in seconds
:rtype: int
"""
if not PYMEDIAINFO_AVAILABLE: if not PYMEDIAINFO_AVAILABLE:
return 180 return 180

View file

@ -178,18 +178,16 @@ class Source(ABC):
if ident not in self._index: if ident not in self._index:
return None return None
res: Optional[Result] = Result.from_filename(ident, self.source_name) res: Result = Result.from_filename(ident, self.source_name)
if res is not None: return Entry(
return Entry( ident=ident,
ident=ident, source=self.source_name,
source=self.source_name, duration=180,
duration=180, album=res.album,
album=res.album, title=res.title,
title=res.title, artist=res.artist,
artist=res.artist, performer=performer,
performer=performer, )
)
return None
async def search(self, query: str) -> list[Result]: async def search(self, query: str) -> list[Result]:
""" """
@ -205,10 +203,7 @@ class Source(ABC):
filtered: list[str] = self.filter_data_by_query(query, self._index) filtered: list[str] = self.filter_data_by_query(query, self._index)
results: list[Result] = [] results: list[Result] = []
for filename in filtered: for filename in filtered:
result: Optional[Result] = Result.from_filename(filename, self.source_name) results.append(Result.from_filename(filename, self.source_name))
if result is None:
continue
results.append(result)
return results return results
@abstractmethod @abstractmethod

View file

@ -1,8 +1,7 @@
""" """
Construct the YouTube source. Construct the YouTube source.
If available, downloading will be performed via yt-dlp, if not, pytube will be This source uses yt-dlp to search and download videos from YouTube.
used.
Adds it to the ``available_sources`` with the name ``youtube``. Adds it to the ``available_sources`` with the name ``youtube``.
""" """
@ -28,11 +27,20 @@ class YouTube:
A minimal compatibility layer for the YouTube object of pytube, implemented via yt-dlp A minimal compatibility layer for the YouTube object of pytube, implemented via yt-dlp
""" """
__cache__: dict[str, Any] = ( __cache__: dict[
{} str, Any
) # TODO: this may grow fast... but atm it fixed youtubes anti bot measures ] = {} # TODO: this may grow fast... but atm it fixed youtubes anti bot measures
def __init__(self, url: Optional[str] = None): def __init__(self, url: Optional[str] = None):
"""
Construct a YouTube object from a url.
If the url is already in the cache, the object is constructed from the
cache. Otherwise yt-dlp is used to extract the information.
:param url: The url of the video.
:type url: Optional[str]
"""
self._title: Optional[str] self._title: Optional[str]
self._author: Optional[str] self._author: Optional[str]
@ -63,22 +71,37 @@ class YouTube:
@property @property
def title(self) -> str: def title(self) -> str:
"""
The title of the video.
:return: The title of the video.
:rtype: str
"""
if self._title is None: if self._title is None:
return "" return ""
else: return self._title
return self._title
@property @property
def author(self) -> str: def author(self) -> str:
"""
The author of the video.
:return: The author of the video.
:rtype: str
"""
if self._author is None: if self._author is None:
return "" return ""
else: return self._author
return self._author
@classmethod @classmethod
def from_result(cls, search_result: dict[str, Any]) -> YouTube: def from_result(cls, search_result: dict[str, Any]) -> YouTube:
""" """
Construct a YouTube object from yt-dlp results. Construct a YouTube object from yt-dlp search results.
Updates the cache with the url and the metadata.
:param search_result: The search result from yt-dlp.
:type search_result: dict[str, Any]
""" """
url = search_result["url"] url = search_result["url"]
cls.__cache__[url] = { cls.__cache__[url] = {
@ -95,8 +118,21 @@ class Search:
A minimal compatibility layer for the Search object of pytube, implemented via yt-dlp A minimal compatibility layer for the Search object of pytube, implemented via yt-dlp
""" """
# pylint: disable=too-few-public-methods
def __init__(self, query: str, channel: Optional[str] = None): def __init__(self, query: str, channel: Optional[str] = None):
sp = "EgIQAfABAQ==" """
Construct a Search object from a query and an optional channel.
Uses yt-dlp to search for the query.
If no channel is given, the search is done on the whole of YouTube.
:param query: The query to search for.
:type query: str
:param channel: The channel to search in.
:type channel: Optional[str]
"""
sp = "EgIQAfABAQ==" # This is a magic string, that tells youtube to search for videos
if channel is None: if channel is None:
query_url = f"https://youtube.com/results?{urlencode({'search_query': query, 'sp':sp})}" query_url = f"https://youtube.com/results?{urlencode({'search_query': query, 'sp':sp})}"
else: else:
@ -157,7 +193,12 @@ class YoutubeSource(Source):
# pylint: disable=too-many-instance-attributes # pylint: disable=too-many-instance-attributes
def __init__(self, config: dict[str, Any]): def __init__(self, config: dict[str, Any]):
"""Create the source.""" """
Create the YouTube source.
:param config: The configuration for the source.
:type config: dict[str, Any]
"""
super().__init__(config) super().__init__(config)
self.channels: list[str] = config["channels"] if "channels" in config else [] self.channels: list[str] = config["channels"] if "channels" in config else []
@ -227,6 +268,16 @@ class YoutubeSource(Source):
""" """
def _get_entry(performer: str, url: str) -> Optional[Entry]: def _get_entry(performer: str, url: str) -> Optional[Entry]:
"""
Create the entry in a thread.
:param performer: The person singing.
:type performer: str
:param url: A url to a YouTube video.
:type url: str
:return: An entry with the data.
:rtype: Optional[Entry]
"""
yt_song = YouTube(url) yt_song = YouTube(url)
try: try:
length = yt_song.length length = yt_song.length
@ -264,6 +315,17 @@ class YoutubeSource(Source):
""" """
def _contains_index(query: str, result: YouTube) -> float: def _contains_index(query: str, result: YouTube) -> float:
"""
Calculate a score for the result.
The score is the ratio of how many words of the query are in the
title and author of the result.
:param query: The query to search for.
:type query: str
:param result: The result to score.
:type result: YouTube
"""
compare_string: str = result.title.lower() + " " + result.author.lower() compare_string: str = result.title.lower() + " " + result.author.lower()
hits: int = 0 hits: int = 0
queries: list[str] = shlex.split(query.lower()) queries: list[str] = shlex.split(query.lower())