import datetime import os from dataclasses import dataclass from io import BytesIO from typing import Any, Literal from configparser import ConfigParser from argparse import ArgumentParser import requests import tqdm from mutagen.flac import FLAC, Picture from PIL import Image import musicbrainzngs as mb @dataclass class Track: album: "Album" title: str nr: int id: str dest_path: str @dataclass class ImageLink: small: str | None thumbnail: str | None large: str | None back: str | None @classmethod def from_dict( cls, data: dict[ Literal["small"] | Literal["thumbnail"] | Literal["large"] | Literal["back"], str | None, ], ) -> "ImageLink": return ImageLink(**data) @dataclass class Album: artist: str title: str tracks: list[Track] | None maximum_bit_depth: int maximum_sampling_rate: int id: str image: ImageLink release_date: datetime.date cover_data: bytes | None dest_path: str @classmethod def from_dict(cls, data: dict[Any, Any]) -> "Album": release_date = datetime.date.fromtimestamp(data["released_at"]) image = ImageLink.from_dict(data["image"]) image_content = None if image.large is not None: # Fetch image print(f"Fetching image from {image.large}") image_content = requests.get(image.large).content album_path = f"{data['artist']['name']} - {data['title']}".replace( "/", "-" ).replace("\\", "-") album = cls( artist=data["artist"]["name"], title=data["title"], tracks=None, maximum_bit_depth=data["maximum_bit_depth"], maximum_sampling_rate=data["maximum_sampling_rate"], id=data["id"], image=ImageLink.from_dict(data["image"]), release_date=release_date, cover_data=image_content, dest_path=album_path, ) if "tracks" in data and "track_ids" in data: tracks = [ Track( nr=tnr + 1, id=tid, title=track["title"], album=album, dest_path=f"{album.dest_path}/{album.artist} - {album.title} - {tnr + 1:02d} {track['title']}.flac", ) for tnr, (tid, track) in enumerate( zip(data["track_ids"], data["tracks"]["items"]) ) ] album.tracks = tracks return album class Qobuz: def __init__(self, api_base, quality: int) -> None: self.api_base = api_base self.quality = quality def get_download_url(self, track_id: str) -> str: url = f"{self.api_base}/download-music" result = requests.get( url, params={"track_id": track_id, "quality": self.quality} ) if result.status_code == 200: result_json = result.json() if result_json["success"]: return result_json["data"]["url"] raise RuntimeError(f"Failed to get download URL, result: {result}") def download_track(self, track: Track) -> None: download_url = self.get_download_url(track.id) dest_path = track.dest_path response = requests.get(download_url, stream=True) if response.status_code == 200: total_size = int(response.headers.get("content-length", 0)) with tqdm.tqdm( total=total_size, unit="B", unit_scale=True, desc=track.title ) as pbar: with open(dest_path, "wb") as file: for chunk in response.iter_content(chunk_size=8192): file.write(chunk) pbar.update(len(chunk)) # Set metadata using mutagen audio = FLAC(dest_path) audio["title"] = track.title audio["artist"] = track.album.artist audio["tracknumber"] = str(track.nr) audio["album"] = track.album.title audio["release_date"] = track.album.release_date.isoformat() if track.album.cover_data is not None: picture = Picture() pil_image = Image.open(BytesIO(track.album.cover_data)) picture.type = 3 # Front cover picture.data = track.album.cover_data picture.mime = pil_image.get_format_mimetype() picture.desc = "Cover" picture.width = pil_image.width picture.height = pil_image.height picture.depth = len(pil_image.getbands()) * 8 audio.add_picture(picture) audio.save() print(f"Downloaded {dest_path} successfully.") else: raise RuntimeError("Failed to download track") def get_album(self, album_id: str) -> Album: url = f"{self.api_base}/get-album" result = requests.get(url, params={"album_id": album_id}) if result.status_code == 200: data = result.json()["data"] return Album.from_dict(data) raise RuntimeError(f"Failed to get album, result: {result}") def download_album(self, album: Album) -> None: artist = album.artist album_title = album.title tracks = album.tracks if album.tracks is not None else [] if os.path.exists(album.dest_path): skip = input( f"Destination {album.dest_path} already exists, skip download? [Y/n] " ) if skip.strip().lower() != "n": print("Skipping download.") return os.makedirs(album.dest_path, exist_ok=True) print(f"Downloading album: {artist} - {album_title}") for nr, track in enumerate(tracks): print(f"Downloading track #{nr + 1:02d} {track.title}...") if os.path.exists(track.dest_path): skip = input( f"File {track.dest_path} already exists, skip download? [Y/n] " ) if skip.strip().lower() != "n": print("Skipping download.") continue self.download_track(track) def search_album(self, query: str) -> Album: print(f'Searching for "{query}"') url = f"{self.api_base}/get-music" result = requests.get(url, params={"q": query, "offset": 0}) if result.status_code == 200: first_album_id = result.json()["data"]["albums"]["items"][0]["id"] return self.get_album(first_album_id) raise RuntimeError("No results") def search_and_download(self, query: str) -> None: album = self.search_album(query) self.download_album(album) def get_config() -> ConfigParser: config = ConfigParser() if os.path.exists("config.ini"): config_file = "config.ini" else: config_file = os.path.expanduser("~/.config/qobuz_dl_remote/config.ini") if not os.path.exists(config_file): raise FileNotFoundError(f"Config file not found: {config_file}") config.read(config_file) return config def main() -> None: config = get_config() base_url = config.get("qobuz", "api_base") if not base_url: print("No API base URL configured, please set api_base under [qobuz].") quality = config.getint("qobuz", "quality", fallback=27) parser = ArgumentParser(description="Qobuz Remote Downloader") parser.add_argument( "--type", "-t", type=str, choices=["album", "discography"], default="album", help="Type of search to perform", ) parser.add_argument( "--additional-releases", "-a", action="append", default=[], help="Include additional releases in discography search (may be applied multiple times)", ) parser.add_argument( "query", type=str, help="Search query for the album to download" ) parser.add_argument("--beet", "-b", action="store_true", help="Import into Beet") args = parser.parse_args() query = args.query qb = Qobuz(api_base=base_url, quality=quality) match args.type: case "album": qb.search_and_download(query) case "discography": mb.set_useragent( "Qobuz-dl-remote", version="0.1.0", contact="christoph.stahl@tu-dortmund.de", ) result = mb.search_artists(query=query, limit=1) artist_id = result["artist-list"][0]["id"] artist_name = result["artist-list"][0]["name"] artist_with_releases = mb.get_artist_by_id( artist_id, includes=["release-groups"] ) all_releases = artist_with_releases["artist"]["release-group-list"] album_releases = filter( lambda entry: entry["primary-type"] in ["Album", *args.additional_releases], all_releases, ) album_name_years = {} for entry in album_releases: year = ( entry["first-release-date"][:4] if "first-release-date" in entry else None ) if entry["title"] not in album_name_years: album_name_years[entry["title"]] = [year] else: album_name_years[entry["title"]].append(year) album_names = [ f"{name} ({year})" if len(years) > 1 else name for name, years in album_name_years.items() for year in years ] for album in album_names: print(f"Found album: {artist_name} - {album}") qb.search_and_download(f"{artist_name} {album}") case _: print("Invalid type specified, use 'album' or 'discography'.") return if __name__ == "__main__": main()