Check existence before getting url
This commit is contained in:
parent
e1c860a711
commit
4b1e1af453
1 changed files with 26 additions and 22 deletions
|
@ -19,6 +19,7 @@ class Track:
|
|||
title: str
|
||||
nr: int
|
||||
id: str
|
||||
dest_path: str
|
||||
|
||||
|
||||
@dataclass
|
||||
|
@ -53,6 +54,7 @@ class Album:
|
|||
image: ImageLink
|
||||
release_date: datetime.date
|
||||
cover_data: bytes | None
|
||||
dest_path: str
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: dict[Any, Any]) -> "Album":
|
||||
|
@ -63,6 +65,9 @@ class Album:
|
|||
# Fetch image
|
||||
print(f"Fetching image from {image.large}")
|
||||
image_content = requests.get(image.large).content
|
||||
album_path = f"{data['artist']['name']} - {data['title']}".replace(
|
||||
"/", "-"
|
||||
).replace("\\", "-")
|
||||
|
||||
album = cls(
|
||||
artist=data["artist"]["name"],
|
||||
|
@ -74,6 +79,7 @@ class Album:
|
|||
image=ImageLink.from_dict(data["image"]),
|
||||
release_date=release_date,
|
||||
cover_data=image_content,
|
||||
dest_path=album_path,
|
||||
)
|
||||
|
||||
if "tracks" in data and "track_ids" in data:
|
||||
|
@ -83,6 +89,7 @@ class Album:
|
|||
id=tid,
|
||||
title=track["title"],
|
||||
album=album,
|
||||
dest_path=f"{album.dest_path}/{album.artist} - {album.title} - {tnr + 1} {track['title']}.flac",
|
||||
)
|
||||
for tnr, (tid, track) in enumerate(
|
||||
zip(data["track_ids"], data["tracks"]["items"])
|
||||
|
@ -108,19 +115,14 @@ class Qobuz:
|
|||
return result_json["data"]["url"]
|
||||
raise RuntimeError(f"Failed to get download URL, result: {result}")
|
||||
|
||||
def download_track(self, track: Track, filename: str, dest: str) -> None:
|
||||
def download_track(self, track: Track) -> None:
|
||||
download_url = self.get_download_url(track.id)
|
||||
dest_path = os.path.join(dest, filename)
|
||||
if os.path.exists(dest_path):
|
||||
skip = input(f"File {dest_path} already exists, skip download? [Y/n] ")
|
||||
if skip.strip().lower() != "n":
|
||||
print("Skipping download.")
|
||||
return
|
||||
dest_path = track.dest_path
|
||||
response = requests.get(download_url, stream=True)
|
||||
if response.status_code == 200:
|
||||
total_size = int(response.headers.get("content-length", 0))
|
||||
with tqdm.tqdm(
|
||||
total=total_size, unit="B", unit_scale=True, desc=filename
|
||||
total=total_size, unit="B", unit_scale=True, desc=track.title
|
||||
) as pbar:
|
||||
with open(dest_path, "wb") as file:
|
||||
for chunk in response.iter_content(chunk_size=8192):
|
||||
|
@ -146,7 +148,7 @@ class Qobuz:
|
|||
audio.add_picture(picture)
|
||||
audio.save()
|
||||
|
||||
print(f"Downloaded {filename}")
|
||||
print(f"Downloaded {dest_path} successfully.")
|
||||
else:
|
||||
raise RuntimeError("Failed to download track")
|
||||
|
||||
|
@ -158,26 +160,30 @@ class Qobuz:
|
|||
return Album.from_dict(data)
|
||||
raise RuntimeError(f"Failed to get album, result: {result}")
|
||||
|
||||
def download_album(self, album: Album, dest: str) -> None:
|
||||
def download_album(self, album: Album) -> None:
|
||||
artist = album.artist
|
||||
album_title = album.title
|
||||
tracks = album.tracks if album.tracks is not None else []
|
||||
if os.path.exists(dest):
|
||||
skip = input(f"Destination {dest} already exists, skip download? [Y/n] ")
|
||||
if os.path.exists(album.dest_path):
|
||||
skip = input(
|
||||
f"Destination {album.dest_path} already exists, skip download? [Y/n] "
|
||||
)
|
||||
if skip.strip().lower() != "n":
|
||||
print("Skipping download.")
|
||||
return
|
||||
|
||||
os.makedirs(dest, exist_ok=True)
|
||||
os.makedirs(album.dest_path, exist_ok=True)
|
||||
print(f"Downloading album: {artist} - {album_title}")
|
||||
for nr, track in enumerate(tracks):
|
||||
print(f"Downloading track #{nr + 1:02d} {track.title}...")
|
||||
filename = (
|
||||
f"{artist} - {album_title} - {nr + 1:02d} {track.title}.flac".replace(
|
||||
"/", "-"
|
||||
).replace("\\", "-")
|
||||
)
|
||||
self.download_track(track, filename, dest)
|
||||
if os.path.exists(track.dest_path):
|
||||
skip = input(
|
||||
f"File {track.dest_path} already exists, skip download? [Y/n] "
|
||||
)
|
||||
if skip.strip().lower() != "n":
|
||||
print("Skipping download.")
|
||||
return
|
||||
self.download_track(track)
|
||||
|
||||
def search_album(self, query: str) -> Album:
|
||||
print(f'Searching for "{query}"')
|
||||
|
@ -190,9 +196,7 @@ class Qobuz:
|
|||
|
||||
def search_and_download(self, query: str) -> None:
|
||||
album = self.search_album(query)
|
||||
safe_title = album.title.replace("/", "-").replace("\\", "-")
|
||||
safe_artist = album.artist.replace("/", "-").replace("\\", "-")
|
||||
self.download_album(album, f"{safe_artist} - {safe_title}")
|
||||
self.download_album(album)
|
||||
|
||||
|
||||
def get_config() -> ConfigParser:
|
||||
|
|
Loading…
Add table
Reference in a new issue