From 6b130dac0014333ce0392673d78d5a0162cb2439 Mon Sep 17 00:00:00 2001 From: Christoph Stahl Date: Sat, 9 Aug 2025 22:41:54 +0200 Subject: [PATCH] Added script --- qobuz-dl-remote.py | 140 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 140 insertions(+) create mode 100644 qobuz-dl-remote.py diff --git a/qobuz-dl-remote.py b/qobuz-dl-remote.py new file mode 100644 index 0000000..77aa8cc --- /dev/null +++ b/qobuz-dl-remote.py @@ -0,0 +1,140 @@ +from typing import Any, Literal +import requests +import tqdm +import sys +from dataclasses import dataclass + +QOBUZ_API_BASE = "https://us.qobuz.squid.wtf/api" +QUALITY = 27 + + +@dataclass +class Track: + title: str + id: str + + +@dataclass +class ImageLink: + small: str | None + thumbnail: str | None + large: str | None + back: str | None + + @classmethod + def from_dict( + cls, + data: dict[ + Literal["small"] + | Literal["thumbnail"] + | Literal["large"] + | Literal["back"], + str | None, + ], + ) -> "ImageLink": + return ImageLink(**data) + + +@dataclass +class Album: + artist: str + title: str + tracks: list[Track] | None + maximum_bit_depth: int + maximum_sampling_rate: int + id: str + image: ImageLink + + @classmethod + def from_dict(cls, data: dict[Any, Any]) -> "Album": + tracks = None + if "tracks" in data and "track_ids" in data: + tracks = [ + Track(id=tid, title=track["title"]) + for tid, track in zip(data["track_ids"], data["tracks"]["items"]) + ] + return cls( + artist=data["artist"]["name"], + title=data["title"], + tracks=tracks, + maximum_bit_depth=data["maximum_bit_depth"], + maximum_sampling_rate=data["maximum_sampling_rate"], + id=data["id"], + image=ImageLink.from_dict(data["image"]), + ) + + +class Qobuz: + def __init__(self, api_base=QOBUZ_API_BASE, quality: int = QUALITY) -> None: + self.api_base = api_base + self.quality = quality + + def get_download_url(self, track_id: str) -> str: + url = f"{QOBUZ_API_BASE}/download-music" + result = requests.get( + url, params={"track_id": track_id, "quality": self.quality} + ) + if result.status_code == 200: + result_json = result.json() + if result_json["success"]: + return result_json["data"]["url"] + raise RuntimeError(f"Failed to get download URL, result: {result}") + + def download_track(self, track_id: str, filename: str) -> None: + download_url = self.get_download_url(track_id) + response = requests.get(download_url, stream=True) + if response.status_code == 200: + total_size = int(response.headers.get("content-length", 0)) + with tqdm.tqdm( + total=total_size, unit="B", unit_scale=True, desc=filename + ) as pbar: + with open(filename, "wb") as file: + for chunk in response.iter_content(chunk_size=8192): + file.write(chunk) + pbar.update(len(chunk)) + print(f"Downloaded {filename}") + else: + raise RuntimeError("Failed to download track") + + def get_album(self, album_id: str) -> Album: + url = f"{self.api_base}/get-album" + result = requests.get(url, params={"album_id": album_id}) + if result.status_code == 200: + data = result.json()["data"] + return Album.from_dict(data) + raise RuntimeError(f"Failed to get album, result: {result}") + + def download_album(self, album: Album) -> None: + artist = album.artist + album_title = album.title + tracks = album.tracks if album.tracks is not None else [] + print(f"Downloading album: {artist} - {album_title}") + for nr, track in enumerate(tracks): + print(f"Downloading track #{nr + 1:02d} {track.title}...") + filename = f"{artist} - {album_title} - {nr + 1:02d} {track.title}.flac" + self.download_track(track.id, filename) + + def search_album(self, query: str) -> Album: + print(f'Searching for "{query}"') + url = f"{self.api_base}/get-music" + result = requests.get(url, params={"q": query, "offset": 0}) + if result.status_code == 200: + first_album_id = result.json()["data"]["albums"]["items"][0]["id"] + return self.get_album(first_album_id) + raise RuntimeError("No results") + + def search_and_download(self, query: str) -> None: + album = self.search_album(query) + self.download_album(album) + + +def main() -> None: + # trackid = sys.argv[1] + # filename = sys.argv[2] + query = sys.argv[1] + qb = Qobuz() + print(qb.search_and_download(query)) + + +if __name__ == "__main__": + main()