missing metadata for youtube

This commit is contained in:
Christoph Stahl 2024-07-09 20:49:24 +02:00
parent b46d5175cd
commit 705169a1f7

View file

@ -16,6 +16,7 @@ from urllib.parse import urlencode
from typing import Any, Optional, Tuple from typing import Any, Optional, Tuple
from yt_dlp import YoutubeDL from yt_dlp import YoutubeDL
from yt_dlp.utils import DownloadError
from ..entry import Entry from ..entry import Entry
from ..result import Result from ..result import Result
@ -27,29 +28,46 @@ class YouTube:
A minimal compatibility layer for the YouTube object of pytube, implemented via yt-dlp A minimal compatibility layer for the YouTube object of pytube, implemented via yt-dlp
""" """
__cache__: dict[ __cache__: dict[str, Any] = (
str, Any {}
] = {} # TODO: this may grow fast... but atm it fixed youtubes anti bot measures ) # TODO: this may grow fast... but atm it fixed youtubes anti bot measures
def __init__(self, url: Optional[str] = None): def __init__(self, url: Optional[str] = None):
if url is not None: if url is not None:
if url in YouTube.__cache__: if url in YouTube.__cache__:
self._infos = YouTube.__cache__[url] self._infos = YouTube.__cache__[url]
else: else:
self._infos = YoutubeDL({"quiet": True}).extract_info(url, download=False) try:
self._infos = YoutubeDL({"quiet": True}).extract_info(
url, download=False
)
except DownloadError:
self.length = 300
self._title = None
self._author = None
self.watch_url = url
return
if self._infos is None: if self._infos is None:
raise RuntimeError(f'Extraction not possible for "{url}"') raise RuntimeError(f'Extraction not possible for "{url}"')
self.length = self._infos["duration"] self.length = self._infos["duration"]
self.title = self._infos["title"] self._title = self._infos["title"]
self.author = self._infos["channel"] self._author = self._infos["channel"]
self.watch_url = url self.watch_url = url
else: else:
self.length = 0 self.length = 0
self.title = "" self._title = ""
self.channel = "" self.channel = ""
self.author = "" self._author = ""
self.watch_url = "" self.watch_url = ""
@property
def title(self) -> str:
return "" if self._title is None else self._title
@property
def author(self) -> str:
return "" if self._author is None else self._author
@classmethod @classmethod
def from_result(cls, search_result: dict[str, Any]) -> YouTube: def from_result(cls, search_result: dict[str, Any]) -> YouTube:
""" """
@ -77,9 +95,7 @@ class Search:
else: else:
if channel[0] == "/": if channel[0] == "/":
channel = channel[1:] channel = channel[1:]
query_url = ( query_url = f"https://www.youtube.com/{channel}/search?{urlencode({'query': query, 'sp':sp})}"
f"https://www.youtube.com/{channel}/search?{urlencode({'query': query, 'sp':sp})}"
)
results = YoutubeDL( results = YoutubeDL(
{ {
@ -93,7 +109,9 @@ class Search:
) )
self.results = [] self.results = []
if results is not None: if results is not None:
filtered_entries = filter(lambda entry: "short" not in entry["url"], results["entries"]) filtered_entries = filter(
lambda entry: "short" not in entry["url"], results["entries"]
)
for r in filtered_entries: for r in filtered_entries:
try: try:
@ -142,7 +160,8 @@ class YoutubeSource(Source):
config["start_streaming"] if "start_streaming" in config else False config["start_streaming"] if "start_streaming" in config else False
) )
self.formatstring = ( self.formatstring = (
f"bestvideo[height<={self.max_res}]+" f"bestaudio/best[height<={self.max_res}]" f"bestvideo[height<={self.max_res}]+"
f"bestaudio/best[height<={self.max_res}]"
) )
self._yt_dlp = YoutubeDL( self._yt_dlp = YoutubeDL(
params={ params={
@ -212,8 +231,8 @@ class YoutubeSource(Source):
source="youtube", source="youtube",
album="YouTube", album="YouTube",
duration=length, duration=length,
title=yt_song.title, title=yt_song._title,
artist=yt_song.author, artist=yt_song._author,
performer=performer, performer=performer,
) )
@ -250,10 +269,15 @@ class YoutubeSource(Source):
results: list[YouTube] = [] results: list[YouTube] = []
results_lists: list[list[YouTube]] = await asyncio.gather( results_lists: list[list[YouTube]] = await asyncio.gather(
*[asyncio.to_thread(self._channel_search, query, channel) for channel in self.channels], *[
asyncio.to_thread(self._channel_search, query, channel)
for channel in self.channels
],
asyncio.to_thread(self._yt_search, query), asyncio.to_thread(self._yt_search, query),
) )
results = [search_result for yt_result in results_lists for search_result in yt_result] results = [
search_result for yt_result in results_lists for search_result in yt_result
]
results.sort(key=partial(_contains_index, query)) results.sort(key=partial(_contains_index, query))
@ -283,6 +307,21 @@ class YoutubeSource(Source):
""" """
return Search(f"{query} karaoke", channel).results return Search(f"{query} karaoke", channel).results
async def get_missing_metadata(self, entry: Entry) -> dict[str, Any]:
"""
Video metadata should be read on the client to avoid banning
the server.
"""
if entry.title is None or entry.artist is None:
print(f"Looking up {entry.ident}")
youtube_video: YouTube = await asyncio.to_thread(YouTube, entry.ident)
return {
"duration": youtube_video.length,
"artist": youtube_video.author,
"title": youtube_video.title,
}
return {}
async def do_buffer(self, entry: Entry) -> Tuple[str, Optional[str]]: async def do_buffer(self, entry: Entry) -> Tuple[str, Optional[str]]:
""" """
Download the video. Download the video.