diff --git a/syng/sources/youtube.py b/syng/sources/youtube.py index b7938ea..549c0f5 100644 --- a/syng/sources/youtube.py +++ b/syng/sources/youtube.py @@ -16,6 +16,7 @@ from urllib.parse import urlencode from typing import Any, Optional, Tuple from yt_dlp import YoutubeDL +from yt_dlp.utils import DownloadError from ..entry import Entry from ..result import Result @@ -27,29 +28,46 @@ class YouTube: A minimal compatibility layer for the YouTube object of pytube, implemented via yt-dlp """ - __cache__: dict[ - str, Any - ] = {} # TODO: this may grow fast... but atm it fixed youtubes anti bot measures + __cache__: dict[str, Any] = ( + {} + ) # TODO: this may grow fast... but atm it fixed youtubes anti bot measures def __init__(self, url: Optional[str] = None): if url is not None: if url in YouTube.__cache__: self._infos = YouTube.__cache__[url] else: - self._infos = YoutubeDL({"quiet": True}).extract_info(url, download=False) + try: + self._infos = YoutubeDL({"quiet": True}).extract_info( + url, download=False + ) + except DownloadError: + self.length = 300 + self._title = None + self._author = None + self.watch_url = url + return if self._infos is None: raise RuntimeError(f'Extraction not possible for "{url}"') self.length = self._infos["duration"] - self.title = self._infos["title"] - self.author = self._infos["channel"] + self._title = self._infos["title"] + self._author = self._infos["channel"] self.watch_url = url else: self.length = 0 - self.title = "" + self._title = "" self.channel = "" - self.author = "" + self._author = "" self.watch_url = "" + @property + def title(self) -> str: + return "" if self._title is None else self._title + + @property + def author(self) -> str: + return "" if self._author is None else self._author + @classmethod def from_result(cls, search_result: dict[str, Any]) -> YouTube: """ @@ -77,9 +95,7 @@ class Search: else: if channel[0] == "/": channel = channel[1:] - query_url = ( - f"https://www.youtube.com/{channel}/search?{urlencode({'query': query, 'sp':sp})}" - ) + query_url = f"https://www.youtube.com/{channel}/search?{urlencode({'query': query, 'sp':sp})}" results = YoutubeDL( { @@ -93,7 +109,9 @@ class Search: ) self.results = [] if results is not None: - filtered_entries = filter(lambda entry: "short" not in entry["url"], results["entries"]) + filtered_entries = filter( + lambda entry: "short" not in entry["url"], results["entries"] + ) for r in filtered_entries: try: @@ -142,7 +160,8 @@ class YoutubeSource(Source): config["start_streaming"] if "start_streaming" in config else False ) self.formatstring = ( - f"bestvideo[height<={self.max_res}]+" f"bestaudio/best[height<={self.max_res}]" + f"bestvideo[height<={self.max_res}]+" + f"bestaudio/best[height<={self.max_res}]" ) self._yt_dlp = YoutubeDL( params={ @@ -212,8 +231,8 @@ class YoutubeSource(Source): source="youtube", album="YouTube", duration=length, - title=yt_song.title, - artist=yt_song.author, + title=yt_song._title, + artist=yt_song._author, performer=performer, ) @@ -250,10 +269,15 @@ class YoutubeSource(Source): results: list[YouTube] = [] results_lists: list[list[YouTube]] = await asyncio.gather( - *[asyncio.to_thread(self._channel_search, query, channel) for channel in self.channels], + *[ + asyncio.to_thread(self._channel_search, query, channel) + for channel in self.channels + ], asyncio.to_thread(self._yt_search, query), ) - results = [search_result for yt_result in results_lists for search_result in yt_result] + results = [ + search_result for yt_result in results_lists for search_result in yt_result + ] results.sort(key=partial(_contains_index, query)) @@ -283,6 +307,21 @@ class YoutubeSource(Source): """ return Search(f"{query} karaoke", channel).results + async def get_missing_metadata(self, entry: Entry) -> dict[str, Any]: + """ + Video metadata should be read on the client to avoid banning + the server. + """ + if entry.title is None or entry.artist is None: + print(f"Looking up {entry.ident}") + youtube_video: YouTube = await asyncio.to_thread(YouTube, entry.ident) + return { + "duration": youtube_video.length, + "artist": youtube_video.author, + "title": youtube_video.title, + } + return {} + async def do_buffer(self, entry: Entry) -> Tuple[str, Optional[str]]: """ Download the video.