qobuz-dl-remote/src/qobuz_dl_remote/main.py

311 lines
10 KiB
Python
Executable file

import datetime
import os
from dataclasses import dataclass
from io import BytesIO
from typing import Any, Literal
from configparser import ConfigParser
from argparse import ArgumentParser
import requests
import tqdm
from mutagen.flac import FLAC, Picture
from PIL import Image
import musicbrainzngs as mb
@dataclass
class Track:
album: "Album"
title: str
nr: int
id: str
dest_path: str
@dataclass
class ImageLink:
small: str | None
thumbnail: str | None
large: str | None
back: str | None
@classmethod
def from_dict(
cls,
data: dict[
Literal["small"]
| Literal["thumbnail"]
| Literal["large"]
| Literal["back"],
str | None,
],
) -> "ImageLink":
return ImageLink(**data)
@dataclass
class Album:
artist: str
title: str
tracks: list[Track] | None
maximum_bit_depth: int
maximum_sampling_rate: int
id: str
image: ImageLink
release_date: datetime.date
cover_data: bytes | None
dest_path: str
@classmethod
def from_dict(cls, data: dict[Any, Any]) -> "Album":
release_date = datetime.date.fromtimestamp(data["released_at"])
image = ImageLink.from_dict(data["image"])
image_content = None
if image.large is not None:
# Fetch image
print(f"Fetching image from {image.large}")
image_content = requests.get(image.large).content
album_path = f"{data['artist']['name']} - {data['title']}".replace(
"/", "-"
).replace("\\", "-")
album = cls(
artist=data["artist"]["name"],
title=data["title"],
tracks=None,
maximum_bit_depth=data["maximum_bit_depth"],
maximum_sampling_rate=data["maximum_sampling_rate"],
id=data["id"],
image=ImageLink.from_dict(data["image"]),
release_date=release_date,
cover_data=image_content,
dest_path=album_path,
)
if "tracks" in data and "track_ids" in data:
tracks = [
Track(
nr=tnr + 1,
id=tid,
title=track["title"],
album=album,
dest_path=os.path.join(
album.dest_path,
f"{album.artist} - {album.title} - {tnr + 1:02d} {track['title']}.flac".replace(
"/", "-"
).replace(
"\\", "-"
),
),
)
for tnr, (tid, track) in enumerate(
zip(data["track_ids"], data["tracks"]["items"])
)
]
album.tracks = tracks
return album
class Qobuz:
def __init__(self, api_base, quality: int) -> None:
self.api_base = api_base
self.quality = quality
def get_download_url(self, track_id: str) -> str:
url = f"{self.api_base}/download-music"
result = requests.get(
url, params={"track_id": track_id, "quality": self.quality}
)
if result.status_code == 200:
result_json = result.json()
if result_json["success"]:
return result_json["data"]["url"]
raise RuntimeError(f"Failed to get download URL, result: {result}")
def download_track(self, track: Track) -> None:
download_url = self.get_download_url(track.id)
dest_path = track.dest_path
response = requests.get(download_url, stream=True)
if response.status_code == 200:
total_size = int(response.headers.get("content-length", 0))
with tqdm.tqdm(
total=total_size, unit="B", unit_scale=True, desc=track.title
) as pbar:
with open(dest_path, "wb") as file:
for chunk in response.iter_content(chunk_size=8192):
file.write(chunk)
pbar.update(len(chunk))
# Set metadata using mutagen
audio = FLAC(dest_path)
audio["title"] = track.title
audio["artist"] = track.album.artist
audio["tracknumber"] = str(track.nr)
audio["album"] = track.album.title
audio["release_date"] = track.album.release_date.isoformat()
if track.album.cover_data is not None:
picture = Picture()
pil_image = Image.open(BytesIO(track.album.cover_data))
picture.type = 3 # Front cover
picture.data = track.album.cover_data
picture.mime = pil_image.get_format_mimetype()
picture.desc = "Cover"
picture.width = pil_image.width
picture.height = pil_image.height
picture.depth = len(pil_image.getbands()) * 8
audio.add_picture(picture)
audio.save()
print(f"Downloaded {dest_path} successfully.")
else:
raise RuntimeError("Failed to download track")
def get_album(self, album_id: str) -> Album:
url = f"{self.api_base}/get-album"
result = requests.get(url, params={"album_id": album_id})
if result.status_code == 200:
data = result.json()["data"]
return Album.from_dict(data)
raise RuntimeError(f"Failed to get album, result: {result}")
def download_album(self, album: Album) -> None:
artist = album.artist
album_title = album.title
tracks = album.tracks if album.tracks is not None else []
if os.path.exists(album.dest_path):
skip = input(
f"Destination {album.dest_path} already exists, skip download? [Y/n] "
)
if skip.strip().lower() != "n":
print("Skipping download.")
return
os.makedirs(album.dest_path, exist_ok=True)
print(f"Downloading album: {artist} - {album_title}")
for nr, track in enumerate(tracks):
print(f"Downloading track #{nr + 1:02d} {track.title}...")
if os.path.exists(track.dest_path):
skip = input(
f"File {track.dest_path} already exists, skip download? [Y/n] "
)
if skip.strip().lower() != "n":
print("Skipping download.")
continue
self.download_track(track)
def search_album(self, query: str, nr: int) -> Album:
print(f'Searching for "{query}"')
url = f"{self.api_base}/get-music"
result = requests.get(url, params={"q": query, "offset": 0})
if result.status_code == 200:
first_album_id = result.json()["data"]["albums"]["items"][nr - 1]["id"]
return self.get_album(first_album_id)
raise RuntimeError("No results")
def search_and_download(self, query: str, nr: int) -> None:
album = self.search_album(query, nr)
self.download_album(album)
def get_config() -> ConfigParser:
config = ConfigParser()
if os.path.exists("config.ini"):
config_file = "config.ini"
else:
config_file = os.path.expanduser("~/.config/qobuz_dl_remote/config.ini")
if not os.path.exists(config_file):
raise FileNotFoundError(f"Config file not found: {config_file}")
config.read(config_file)
return config
def main() -> None:
config = get_config()
base_url = config.get("qobuz", "api_base")
if not base_url:
print("No API base URL configured, please set api_base under [qobuz].")
quality = config.getint("qobuz", "quality", fallback=27)
parser = ArgumentParser(description="Qobuz Remote Downloader")
parser.add_argument(
"--type",
"-t",
type=str,
choices=["album", "discography"],
default="album",
help="Type of search to perform",
)
parser.add_argument(
"--nr", "-n", type=int, default=1, help="Number of albums to download"
)
parser.add_argument(
"--additional-releases",
"-a",
action="append",
default=[],
help="Include additional releases in discography search (may be applied multiple times)",
)
parser.add_argument(
"--skip", "-s", action="append", default=[], help="Skip specific albums by name"
)
parser.add_argument(
"query", type=str, help="Search query for the album to download"
)
parser.add_argument("--beet", "-b", action="store_true", help="Import into Beet")
args = parser.parse_args()
query = args.query
qb = Qobuz(api_base=base_url, quality=quality)
match args.type:
case "album":
qb.search_and_download(query, args.nr)
case "discography":
mb.set_useragent(
"Qobuz-dl-remote",
version="0.1.0",
contact="christoph.stahl@tu-dortmund.de",
)
result = mb.search_artists(query=query, limit=1)
artist_id = result["artist-list"][0]["id"]
artist_name = result["artist-list"][0]["name"]
artist_with_releases = mb.get_artist_by_id(
artist_id, includes=["release-groups"]
)
all_releases = artist_with_releases["artist"]["release-group-list"]
album_releases = filter(
lambda entry: entry["primary-type"]
in ["Album", *args.additional_releases],
all_releases,
)
album_name_years = {}
for entry in album_releases:
year = (
entry["first-release-date"][:4]
if "first-release-date" in entry
else None
)
if entry["title"] not in album_name_years:
album_name_years[entry["title"]] = [year]
else:
album_name_years[entry["title"]].append(year)
album_names = [
f"{name} ({year})" if len(years) > 1 else name
for name, years in album_name_years.items()
for year in years
]
for album in album_names:
if any(skip.lower() in album.lower() for skip in args.skip):
print(f"Skipping album: {album}")
continue
print(f"Found album: {artist_name} - {album}")
qb.search_and_download(f"{artist_name} {album}")
case _:
print("Invalid type specified, use 'album' or 'discography'.")
return
if __name__ == "__main__":
main()