Parallel Searching

This commit is contained in:
Christoph Stahl 2022-11-29 22:54:14 +01:00
parent 593ee0caa6
commit d12d67c4e6
5 changed files with 61 additions and 50 deletions

View file

@ -102,6 +102,8 @@ async def preview(entry: Entry) -> None:
"--sub-file=-", "--sub-file=-",
"--fullscreen", "--fullscreen",
stdin=asyncio.subprocess.PIPE, stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
) )
await process.communicate(subtitle.encode()) await process.communicate(subtitle.encode())

View file

@ -415,16 +415,22 @@ async def handle_search(sid: str, data: dict[str, str]) -> None:
query = data["query"] query = data["query"]
result_futures = [] result_futures = []
for source in state.config.sources_prio: results_list = await asyncio.gather(
loop = asyncio.get_running_loop() *[
search_future = loop.create_future() state.config.sources[source].search(query)
loop.create_task(state.config.sources[source].search(search_future, query)) for source in state.config.sources_prio
result_futures.append(search_future) ]
)
# for source in state.config.sources_prio:
# loop = asyncio.get_running_loop()
# search_future = loop.create_future()
# loop.create_task(state.config.sources[source].search(search_future, query))
# result_futures.append(search_future)
results = [ results = [
search_result search_result
for result_future in result_futures for source_result in results_list
for search_result in await result_future for search_result in source_result
] ]
await sio.emit( await sio.emit(
"search-results", "search-results",

View file

@ -1,5 +1,4 @@
from json import load, dump # from json import load, dump
from time import sleep, perf_counter
from itertools import zip_longest from itertools import zip_longest
import asyncio import asyncio
import os import os
@ -49,15 +48,16 @@ class S3Source(Source):
async def get_config(self) -> dict[str, Any] | list[dict[str, Any]]: async def get_config(self) -> dict[str, Any] | list[dict[str, Any]]:
def _get_config() -> dict[str, Any] | list[dict[str, Any]]: def _get_config() -> dict[str, Any] | list[dict[str, Any]]:
if not self.index: if not self.index:
print(f"Indexing {self.bucket}") print(f"s3: Indexing '{self.bucket}'")
# self.index = [ self.index = [
# obj.object_name obj.object_name
# for obj in self.minio.list_objects(self.bucket, recursive=True) for obj in self.minio.list_objects(self.bucket, recursive=True)
# ] ]
print("s3: Indexing done")
# with open("s3_files", "w") as f: # with open("s3_files", "w") as f:
# dump(self.index, f) # dump(self.index, f)
with open("s3_files", "r") as f: # with open("s3_files", "r") as f:
self.index = [item for item in load(f) if item.endswith(".cdg")] # self.index = [item for item in load(f) if item.endswith(".cdg")]
chunked = zip_longest(*[iter(self.index)] * 1000, fillvalue="") chunked = zip_longest(*[iter(self.index)] * 1000, fillvalue="")
return [ return [
@ -69,10 +69,7 @@ class S3Source(Source):
def add_to_config(self, config: dict[str, Any]) -> None: def add_to_config(self, config: dict[str, Any]) -> None:
self.index += config["index"] self.index += config["index"]
async def search( async def search(self, query: str) -> list[Result]:
self, result_future: asyncio.Future[list[Result]], query: str
) -> None:
print("searching s3")
filtered: list[str] = self.filter_data_by_query(query, self.index) filtered: list[str] = self.filter_data_by_query(query, self.index)
results: list[Result] = [] results: list[Result] = []
for filename in filtered: for filename in filtered:
@ -80,7 +77,7 @@ class S3Source(Source):
if result is None: if result is None:
continue continue
results.append(result) results.append(result)
result_future.set_result(results) return results
async def get_missing_metadata(self, entry: Entry) -> dict[str, Any]: async def get_missing_metadata(self, entry: Entry) -> dict[str, Any]:
def mutagen_wrapped(file: str) -> int: def mutagen_wrapped(file: str) -> int:
@ -109,12 +106,12 @@ class S3Source(Source):
target_file_mp3: str = target_file_cdg[:-3] + "mp3" target_file_mp3: str = target_file_cdg[:-3] + "mp3"
os.makedirs(os.path.dirname(target_file_cdg), exist_ok=True) os.makedirs(os.path.dirname(target_file_cdg), exist_ok=True)
video_task: asyncio.Task[None] = asyncio.create_task( video_task: asyncio.Task[Any] = asyncio.create_task(
asyncio.to_thread( asyncio.to_thread(
self.minio.fget_object, self.bucket, entry.id, target_file_cdg self.minio.fget_object, self.bucket, entry.id, target_file_cdg
) )
) )
audio_task: asyncio.Task[None] = asyncio.create_task( audio_task: asyncio.Task[Any] = asyncio.create_task(
asyncio.to_thread( asyncio.to_thread(
self.minio.fget_object, self.bucket, ident_mp3, target_file_mp3 self.minio.fget_object, self.bucket, ident_mp3, target_file_mp3
) )

View file

@ -42,15 +42,17 @@ class Source:
[f"--audio-file={audio}"] if audio else [] [f"--audio-file={audio}"] if audio else []
) )
mpv_process = asyncio.create_subprocess_exec("mpv", *args) mpv_process = asyncio.create_subprocess_exec(
"mpv",
*args,
stdout=asyncio.subprocess.PIPE,
)
return await mpv_process return await mpv_process
async def get_entry(self, performer: str, ident: str) -> Entry: async def get_entry(self, performer: str, ident: str) -> Entry:
raise NotImplementedError raise NotImplementedError
async def search( async def search(self, query: str) -> list[Result]:
self, result_future: asyncio.Future[list[Result]], query: str
) -> None:
raise NotImplementedError raise NotImplementedError
async def doBuffer(self, entry: Entry) -> Tuple[str, Optional[str]]: async def doBuffer(self, entry: Entry) -> Tuple[str, Optional[str]]:

View file

@ -63,21 +63,22 @@ class YoutubeSource(Source):
return 1 - (hits / len(queries)) return 1 - (hits / len(queries))
async def search( async def search(self, query: str) -> list[Result]:
self, result_future: asyncio.Future[list[Result]], query: str
) -> None:
def _search(result_future: asyncio.Future[list[Result]], query: str) -> None:
results: list[YouTube] = [] results: list[YouTube] = []
for channel in self.channels: results_lists: list[list[YouTube]] = await asyncio.gather(
results += self._channel_search(query, channel) *[
search_results: Optional[list[YouTube]] = Search(query + " karaoke").results asyncio.to_thread(self._channel_search, query, channel)
if search_results is not None: for channel in self.channels
results += search_results ],
asyncio.to_thread(self._yt_search, query),
)
results = [
search_result for yt_result in results_lists for search_result in yt_result
]
results.sort(key=partial(self._contains_index, query)) results.sort(key=partial(self._contains_index, query))
result_future.set_result( return [
[
Result( Result(
id=result.watch_url, id=result.watch_url,
source="youtube", source="youtube",
@ -87,9 +88,12 @@ class YoutubeSource(Source):
) )
for result in results for result in results
] ]
)
await asyncio.to_thread(_search, result_future, query) def _yt_search(self, query: str) -> list[YouTube]:
results = Search(f"{query} karaoke").results
if results is not None:
return results
return []
def _channel_search(self, query: str, channel: str) -> list[YouTube]: def _channel_search(self, query: str, channel: str) -> list[YouTube]:
browse_id: str = Channel(f"https://www.youtube.com{channel}").channel_id browse_id: str = Channel(f"https://www.youtube.com{channel}").channel_id