Package Changes

- Add ``cover-size`` option to ``audible download``. Default cover size is 500. Naming of cover files contains size for now.
- BUGFIX closing ``client`` and ``api_client`` correctly
- Reformating code
- Rework ``models`` and ``cmd_download`` module
This commit is contained in:
mkb79 2020-12-13 15:44:44 +01:00
parent c0e75c962c
commit 740d6a12ef
18 changed files with 701 additions and 366 deletions

View file

@ -12,18 +12,18 @@
#
import os
import sys
sys.path.insert(0, os.path.abspath('../../src'))
import audible-cli
sys.path.insert(0, os.path.abspath("../../src"))
import audible_cli
# -- Project information -----------------------------------------------------
project = 'audible-cli'
copyright = '2020, mkb79'
author = 'mkb79'
project = "audible-cli"
copyright = "2020, mkb79"
author = "mkb79"
# The full version, including alpha/beta/rc tags
version = audible-cli.__version__
version = audible_cli.__version__
# -- General configuration ---------------------------------------------------
@ -32,23 +32,23 @@ version = audible-cli.__version__
# extensions coming with Sphinx (named 'sphinx.ext.*') or your custom
# ones.
extensions = [
'recommonmark',
'sphinx.ext.autodoc',
'sphinx.ext.coverage',
'sphinx.ext.napoleon',
'sphinx_rtd_theme',
'sphinx.ext.autosummary'
"recommonmark",
"sphinx.ext.autodoc",
"sphinx.ext.coverage",
"sphinx.ext.napoleon",
"sphinx_rtd_theme",
"sphinx.ext.autosummary"
]
master_doc = 'index'
master_doc = "index"
# Add any paths that contain templates here, relative to this directory.
templates_path = ['_templates']
templates_path = ["_templates"]
# List of patterns, relative to source directory, that match files and
# directories to ignore when looking for source files.
# This pattern also affects html_static_path and html_extra_path.
exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store']
exclude_patterns = ["_build", "Thumbs.db", ".DS_Store"]
# -- Options for HTML output -------------------------------------------------
@ -56,9 +56,9 @@ exclude_patterns = ['_build', 'Thumbs.db', '.DS_Store']
# The theme to use for HTML and HTML Help pages. See the documentation for
# a list of builtin themes.
#
html_theme = 'sphinx_rtd_theme'
html_theme = "sphinx_rtd_theme"
# Add any paths that contain custom static files (such as style sheets) here,
# relative to this directory. They are copied after the builtin static files,
# so a file named "default.css" will overwrite the builtin "default.css".
html_static_path = ['_static']
html_static_path = ["_static"]

View file

@ -1,4 +1,3 @@
import os
import pathlib
import re
import sys
@ -7,9 +6,9 @@ from setuptools import setup, find_packages
# 'setup.py publish' shortcut.
if sys.argv[-1] == 'publish':
system('python setup.py sdist bdist_wheel')
system('twine upload dist/*')
if sys.argv[-1] == "publish":
system("python setup.py sdist bdist_wheel")
system("twine upload dist/*")
sys.exit()
if sys.version_info < (3, 6, 0):
@ -17,9 +16,9 @@ if sys.version_info < (3, 6, 0):
here = pathlib.Path(__file__).parent
long_description = (here / 'README.md').read_text('utf-8')
long_description = (here / "README.md").read_text("utf-8")
about = (here / 'src' / 'audible_cli' / '_version.py').read_text('utf-8')
about = (here / "src" / "audible_cli" / "_version.py").read_text("utf-8")
def read_from_file(key):
@ -27,39 +26,39 @@ def read_from_file(key):
setup(
name=read_from_file('__title__'),
version=read_from_file('__version__'),
packages=find_packages('src'),
package_dir={'': 'src'},
name=read_from_file("__title__"),
version=read_from_file("__version__"),
packages=find_packages("src"),
package_dir={"": "src"},
include_package_data=True,
description=read_from_file('__description__'),
url=read_from_file('__url__'),
license=read_from_file('__license__'),
author=read_from_file('__author__'),
author_email=read_from_file('__author_email__'),
description=read_from_file("__description__"),
url=read_from_file("__url__"),
license=read_from_file("__license__"),
author=read_from_file("__author__"),
author_email=read_from_file("__author_email__"),
classifiers=[
'Development Status :: 4 - Beta',
'Intended Audience :: Developers',
'License :: OSI Approved :: GNU Affero General Public License v3',
'Programming Language :: Python :: 3.6',
'Programming Language :: Python :: 3.7',
'Programming Language :: Python :: 3.8'
"Development Status :: 4 - Beta",
"Intended Audience :: Developers",
"License :: OSI Approved :: GNU Affero General Public License v3",
"Programming Language :: Python :: 3.6",
"Programming Language :: Python :: 3.7",
"Programming Language :: Python :: 3.8"
],
install_requires=[
'aiofiles',
'audible>=0.5.0',
'click',
'colorama; platform_system=="Windows"',
'httpx==0.16.*',
'Pillow',
'tabulate',
'toml',
'tqdm'
"aiofiles",
"audible>=0.5.0",
"click",
"colorama; platform_system=='Windows'",
"httpx==0.16.*",
"Pillow",
"tabulate",
"toml",
"tqdm"
],
python_requires='>=3.6',
keywords='Audible, API, async, cli',
python_requires=">=3.6",
keywords="Audible, API, async, cli",
long_description=long_description,
long_description_content_type='text/markdown',
long_description_content_type="text/markdown",
project_urls={
"Documentation": "https://audiblecli.readthedocs.io/",
"Source": "https://github.com/mkb79/Audible-cli",
@ -68,4 +67,4 @@ setup(
"console_scripts": ["audible = audible_cli:main",
"audible-quickstart = audible_cli:quickstart"]
}
)
)

View file

@ -4,4 +4,4 @@ from ._logging import log_helper
from ._version import __version__
from .cli import main, quickstart
__all__ = ["__version__", "main", "quickstart", "log_helper"]
__all__ = ["__version__", "main", "quickstart", "log_helper"]

View file

@ -3,4 +3,4 @@ import sys
from . import cli
if __name__ == "__main__":
sys.exit(cli.main(prog_name="python -m audible"))
sys.exit(cli.main(prog_name="python -m audible"))

View file

@ -59,4 +59,4 @@ class AudibleCliLogHelper:
)
log_helper = AudibleCliLogHelper()
log_helper = AudibleCliLogHelper()

View file

@ -1,8 +1,8 @@
__title__ = "audible-cli"
__description__ = "Command line interface (cli) for the audible package."
__url__ = "https://github.com/mkb79/audible-cli"
__version__ = "0.0.dev5"
__version__ = "0.0.dev6"
__author__ = "mkb79"
__author_email__ = "mkb79@hackitall.de"
__license__ = "AGPL"
__status__ = "Development"
__status__ = "Development"

View file

@ -17,8 +17,7 @@ from .options import (
quickstart_config_option
)
CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help'])
CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"])
@click.group(context_settings=CONTEXT_SETTINGS)
@ -48,11 +47,11 @@ def quickstart(ctx):
try:
sys.exit(ctx.forward(cmd_quickstart.cli))
except KeyboardInterrupt:
sys.exit('\nERROR: Interrupted by user')
sys.exit("\nERROR: Interrupted by user")
def main(*args, **kwargs):
try:
sys.exit(cli(*args, **kwargs))
except KeyboardInterrupt:
sys.exit('\nERROR: Interrupted by user')
sys.exit("\nERROR: Interrupted by user")

View file

@ -1,18 +1,27 @@
import asyncio
import asyncio.log
import asyncio.sslproto
import json
import pathlib
import ssl
import sys
import aiofiles
import audible
import click
import httpx
import tqdm
from audible.exceptions import NotFoundError
from click import echo, secho
from tabulate import tabulate
from ..models import Library
from ..config import pass_session
from ..models import Library
from ..utils import Downloader
SSL_PROTOCOLS = (asyncio.sslproto.SSLProtocol,)
def ignore_httpx_ssl_eror(loop):
"""Ignore aiohttp #3535 / cpython #13548 issue with SSL data after close
@ -35,7 +44,7 @@ def ignore_httpx_ssl_eror(loop):
orig_handler = loop.get_exception_handler()
def ignore_ssl_error(loop, context):
def ignore_ssl_error(context):
if context.get("message") in {
"SSL error in data received",
"Fatal error on transport",
@ -44,12 +53,13 @@ def ignore_httpx_ssl_eror(loop):
exception = context.get("exception")
protocol = context.get("protocol")
if (
isinstance(exception, ssl.SSLError)
and exception.reason == "KRB5_S_INIT"
and isinstance(protocol, SSL_PROTOCOLS)
isinstance(exception, ssl.SSLError)
and exception.reason == "KRB5_S_INIT"
and isinstance(protocol, SSL_PROTOCOLS)
):
if loop.get_debug():
asyncio.log.logger.debug("Ignoring httpx SSL KRB5_S_INIT error")
asyncio.log.logger.debug(
"Ignoring httpx SSL KRB5_S_INIT error")
return
if orig_handler is not None:
orig_handler(loop, context)
@ -59,13 +69,90 @@ def ignore_httpx_ssl_eror(loop):
loop.set_exception_handler(ignore_ssl_error)
async def download_cover(client, output_dir, item, res,
overwrite_existing):
filename = f"{item.full_title_slugify}_({str(res)}).jpg"
filepath = output_dir / filename
url = item.get_cover_url(res)
if url is None:
secho(
f"No COVER found for {item.full_title} with given resolution.",
fg="yellow")
return
dl = Downloader(url, filepath, client, overwrite_existing)
await dl.arun(stream=False, pb=False)
async def download_pdf(client, output_dir, item, overwrite_existing):
url = item.get_pdf_url()
if url is None:
secho(f"No PDF found for {item.full_title}.", fg="yellow")
return
filename = item.full_title_slugify + ".pdf"
filepath = output_dir / filename
dl = Downloader(url, filepath, client, overwrite_existing)
await dl.arun(stream=False, pb=False)
async def download_chapters(api_client, output_dir, item, quality,
overwrite_existing):
if not output_dir.is_dir():
raise Exception("Output dir doesn't exists")
filename = item.full_title_slugify + "-chapters.json"
file = output_dir / filename
if file.exists() and not overwrite_existing:
secho(f"File {file} already exists. Skip saving chapters.", fg="blue")
return True
try:
metadata = await item.aget_content_metadata(quality, api_client)
except NotFoundError:
secho(f"Can't get chapters for {item.full_title}. Skip item.",
fg="red")
return
metadata = json.dumps(metadata, indent=4)
async with aiofiles.open(file, "w") as f:
await f.write(metadata)
tqdm.tqdm.write(f"Chapter file saved to {file}.")
async def download_aax(client, output_dir, item, quality,
overwrite_existing):
url, codec = await item.aget_aax_url(quality, client)
filename = item.full_title_slugify + f"-{codec}.aax"
filepath = output_dir / filename
dl = Downloader(url, filepath, client, overwrite_existing)
await dl.arun(pb=True)
async def download_aaxc(api_client, client, output_dir, item, quality,
overwrite_existing):
url, codec, dlr = await item.aget_aaxc_url(quality, api_client)
filepath = pathlib.Path(
output_dir) / f"{item.full_title_slugify}-{codec}.aaxc"
dlr_file = filepath.with_suffix(".voucher")
dlr = json.dumps(dlr, indent=4)
async with aiofiles.open(dlr_file, "w") as f:
await f.write(dlr)
secho(f"Voucher file saved to {dlr_file}.")
dl = Downloader(url, filepath, client, overwrite_existing)
await dl.arun(pb=True)
async def consume(queue):
while True:
item = await queue.get()
try:
await item
except Exception as e:
secho(f"Error in job: {e}")
secho(f"Error in job: {e}", fg="red")
queue.task_done()
@ -79,15 +166,19 @@ async def main(auth, **params):
quality = params.get("quality")
get_pdf = params.get("pdf")
get_cover = params.get("cover")
cover_size = params.get("cover_size")
get_chapters = params.get("chapter")
get_audio = params.get("no_audio") is not True
get_aaxc = params.get("aaxc")
sim_jobs = params.get("jobs")
library = await Library.get_from_api(
auth,
response_groups="product_desc,pdf_url,media,product_attrs,relationships",
num_results=1000)
async with audible.AsyncClient(auth) as client:
library = await Library.aget_from_api(
client,
response_groups=("product_desc, pdf_url, media, product_attrs, "
"relationships"),
image_sizes="1215, 408, 360, 882, 315, 570, 252, 558, 900, 500",
num_results=1000)
jobs = []
@ -112,12 +203,12 @@ async def main(auth, **params):
if full_match or match:
echo(f"\nFound the following matches for '{title}'")
table_data = [[i[1], i[0].full_title, i[0].asin] \
table_data = [[i[1], i[0].full_title, i[0].asin]
for i in full_match or match]
head = ["% match", "title", "asin"]
table = tabulate(
table_data, head, tablefmt="pretty",
colalign=("center", "left", "center"))
colalign=("center", "left", "center"))
echo(table)
if click.confirm("Proceed with this audiobook(s)"):
@ -127,32 +218,62 @@ async def main(auth, **params):
secho(f"Skip title {title}: Not found in library", fg="red")
queue = asyncio.Queue()
for job in jobs:
item = library.get_item_by_asin(job)
if get_cover:
queue.put_nowait(item.get_cover(output_dir, overwrite_existing))
if get_pdf:
queue.put_nowait(item.get_pdf(output_dir, overwrite_existing))
if get_chapters:
queue.put_nowait(item.get_chapter_informations(output_dir, quality, overwrite_existing))
if get_audio:
if get_aaxc:
queue.put_nowait(item.get_audiobook_aaxc(output_dir, quality, overwrite_existing))
else:
queue.put_nowait(item.get_audiobook(output_dir, quality, overwrite_existing))
# schedule the consumer
consumers = [asyncio.ensure_future(consume(queue)) for _ in range(sim_jobs)]
# wait until the consumer has processed all items
await queue.join()
client = httpx.AsyncClient(auth=auth, timeout=15)
api_client = audible.AsyncClient(auth, timeout=15)
async with client, api_client:
for job in jobs:
item = library.get_item_by_asin(job)
if get_cover:
queue.put_nowait(
download_cover(client=client,
output_dir=output_dir,
item=item,
res=cover_size,
overwrite_existing=overwrite_existing))
# the consumer is still awaiting for an item, cancel it
for consumer in consumers:
consumer.cancel()
if get_pdf:
queue.put_nowait(
download_pdf(client=client,
output_dir=output_dir,
item=item,
overwrite_existing=overwrite_existing))
await library._api_client.close()
await library._client.aclose()
if get_chapters:
queue.put_nowait(
download_chapters(api_client=api_client,
output_dir=output_dir,
item=item,
quality=quality,
overwrite_existing=overwrite_existing))
if get_audio:
if get_aaxc:
queue.put_nowait(
download_aaxc(api_client=api_client,
client=client,
output_dir=output_dir,
item=item,
quality=quality,
overwrite_existing=overwrite_existing))
else:
queue.put_nowait(
download_aax(client=client,
output_dir=output_dir,
item=item,
quality=quality,
overwrite_existing=overwrite_existing))
# schedule the consumer
consumers = [asyncio.ensure_future(consume(queue)) for _ in
range(sim_jobs)]
# wait until the consumer has processed all items
await queue.join()
# the consumer is still awaiting for an item, cancel it
for consumer in consumers:
consumer.cancel()
@click.command("download")
@ -199,6 +320,13 @@ async def main(auth, **params):
is_flag=True,
help="downloads the cover in addition to the audiobook"
)
@click.option(
"--cover-size",
type=click.Choice(["252", "315", "360", "408", "500", "558", "570", "882",
"900", "1215"]),
default="500",
help="the cover pixel size"
)
@click.option(
"--chapter",
is_flag=True,
@ -246,4 +374,4 @@ def cli(session, **params):
loop.run_until_complete(main(auth, **params))
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
loop.close()

View file

@ -4,8 +4,8 @@ import pathlib
import click
from ..models import Library
from ..config import pass_session
from ..models import Library
@click.group("library")
@ -61,8 +61,10 @@ async def _export_library(auth, **params):
data_row["series_title"] = v[0]["title"]
data_row["series_sequence"] = v[0]["sequence"]
elif key == "rating":
data_row["rating"] = v["overall_distribution"]["display_average_rating"]
data_row["num_ratings"] = v["overall_distribution"]["num_ratings"]
data_row["rating"] = v["overall_distribution"][
"display_average_rating"]
data_row["num_ratings"] = v["overall_distribution"][
"num_ratings"]
elif key == "library_status":
data_row["date_added"] = v["date_added"]
elif key == "product_images":

View file

@ -1,9 +1,10 @@
import click
import pathlib
import click
from audible import Authenticator
from click import echo, secho
from tabulate import tabulate
from audible import Authenticator
from ..config import pass_session
from ..utils import build_auth_file
@ -75,7 +76,7 @@ def list_profiles(session):
"--auth-file", "-f",
type=click.Path(exists=False, file_okay=True),
prompt="Please enter name for the auth file",
help="The auth file name (without dir) to be added. " \
help="The auth file name (without dir) to be added. "
"The auth file must exist."
)
@click.option(
@ -158,8 +159,9 @@ def check_if_auth_file_not_exists(session, ctx, value):
help="The country code for the marketplace you want to authenticate."
)
@pass_session
def add_auth_file(session, auth_file, password, audible_username, audible_password, country_code):
"Register a new device and add an auth file to config dir"
def add_auth_file(session, auth_file, password, audible_username,
audible_password, country_code):
"""Register a new device and add an auth file to config dir"""
build_auth_file(
filename=session.config.dirname / auth_file,
username=audible_username,
@ -190,12 +192,11 @@ def check_if_auth_file_exists(session, ctx, value):
help="The optional password for the auth file."
)
def remove_auth_file(auth_file, password):
"Deregister a device and remove auth file from config dir"
"""Deregister a device and remove auth file from config dir"""
auth = Authenticator.from_file(auth_file, password)
device_name = auth.device_info["device_name"]
auth.refresh_access_token()
auth.deregister_device()
echo(f"{device_name} deregistered")
echo(f"{device_name} deregistered")
auth_file.unlink()
echo(f"{auth_file} removed from config dir")

View file

@ -51,7 +51,7 @@ class PluginCommands(click.Group):
return mod.cli
CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help'])
CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"])
@click.group(

View file

@ -19,7 +19,8 @@ def tabulate_summary(d: dict) -> None:
]
if "use_existing_auth_file" not in d:
data.append(
["auth_file_password", "***" if "auth_file_password" in d else "-"])
["auth_file_password",
"***" if "auth_file_password" in d else "-"])
data.append(["audible_username", d.get("audible_username")])
data.append(["audible_password", "***"])
@ -46,7 +47,7 @@ an authentication to the audible server is necessary to register a new device.
"""
echo()
secho(intro, bold=True)
path = config.dirname.absolute()
secho("Selected dir to proceed with:", bold=True)
echo(path.absolute())
@ -111,7 +112,7 @@ an authentication to the audible server is necessary to register a new device.
d["audible_username"] = prompt("Please enter your amazon username")
d["audible_password"] = prompt("Please enter your amazon password",
hide_input=True, confirmation_prompt=True)
return d
@ -124,7 +125,7 @@ def cli(session, ctx):
if config.file_exists():
m = f"Config file {config.filename} already exists. Quickstart will " \
f"not overwrite existing files."
ctx.fail(m) if ctx else echo(m)
sys.exit()
@ -150,4 +151,4 @@ def cli(session, ctx):
file_password=d.get("auth_file_password")
)
config.write_config()
config.write_config()

View file

@ -1,9 +1,9 @@
import os
import pathlib
import toml
from typing import Dict, Optional, Union
from typing import Any, Dict, Optional, Union
import click
import toml
from audible import Authenticator
from audible.exceptions import FileEncryptionError
from click import echo, prompt
@ -83,8 +83,9 @@ class Config:
def delete_profile(self, name: str) -> None:
del self.data["profile"][name]
def read_config(self, filename: Optional[Union[str, pathlib.Path]] = None) -> None:
def read_config(self, filename: Optional[
Union[str, pathlib.Path]] = None) -> None:
f = pathlib.Path(filename or self.filename).resolve()
try:
@ -100,7 +101,8 @@ class Config:
self._config_file = f
self._is_read = True
def write_config(self, filename: Optional[Union[str, pathlib.Path]] = None) -> None:
def write_config(self, filename: Optional[
Union[str, pathlib.Path]] = None) -> None:
f = pathlib.Path(filename or self.filename).resolve()
if not f.parent.is_dir():
@ -154,9 +156,11 @@ class Session:
locale=country_code)
return self._auth
except (FileEncryptionError, ValueError):
echo("Auth file is encrypted but no/wrong password is provided")
password = prompt("Please enter the password (or enter to exit)",
hide_input=True, default="")
echo(
"Auth file is encrypted but no/wrong password is provided")
password = prompt(
"Please enter the password (or enter to exit)",
hide_input=True, default="")
if password == "":
ctx = click.get_current_context()
ctx.abort()
@ -178,14 +182,14 @@ pass_session = click.make_pass_decorator(Session, ensure=True)
def add_param_to_session(ctx: click.Context, param, value):
"""Add a parameter to :class:`Session` `param` attribute"""
"""Add a parameter to :class:`Session` `param` attribute"""
session = ctx.ensure_object(Session)
session.params[param.name] = value
return value
def add_plugin_path_to_session(ctx: click.Context, param, value):
"""Add a plugin cmds path to :class:`Session` `param` attribute"""
"""Add a plugin cmds path to :class:`Session` `param` attribute"""
session = ctx.ensure_object(Session)
session._plugin_path = pathlib.Path(value).resolve()
return value
@ -227,4 +231,4 @@ def set_config(ctx, param, value):
"""
session = ctx.ensure_object(Session)
session.config._config_file = pathlib.Path(value)
return value
return value

View file

@ -4,6 +4,10 @@ CONFIG_ENV_DIR: str = "AUDIBLE_CONFIG_DIR"
PLUGIN_PATH = "plugins"
DEFAULT_AUTH_FILE_EXTENSION: str = "json"
DEFAULT_AUTH_FILE_ENCRYPTION: str = "json"
DEFAULT_CONFIG_DATA = {"title": "Audible Config File",
"APP": {},
"profile": {}}
DEFAULT_CONFIG_DATA = {
"title": "Audible Config File",
"APP": {},
"profile": {}
}
CODEC_HIGH_QUALITY = "LC_128_44100_stereo"
CODEC_NORMAL_QUALITY = "LC_64_44100_stereo"

View file

@ -1,76 +1,37 @@
import json
import pathlib
import string
import unicodedata
from typing import Dict, Optional, Union
import aiofiles
import click
import audible
import httpx
import tqdm
from audible import AsyncClient
from audible import Authenticator
from audible.aescipher import decrypt_voucher_from_licenserequest
from audible.localization import Locale
from click import secho
from .constants import CODEC_HIGH_QUALITY, CODEC_NORMAL_QUALITY
from .utils import LongestSubString
CLIENT_TIMEOUT = 15
CODEC_HIGH_QUALITY = "LC_128_44100_stereo"
CODEC_NORMAL_QUALITY = "LC_64_44100_stereo"
async def download_content(client, url, output_dir, filename,
overwrite_existing=False):
output_dir = pathlib.Path(output_dir)
if not output_dir.is_dir():
raise Exception("Output dir doesn't exists")
file = output_dir / filename
tmp_file = file.with_suffix(".tmp")
if file.exists() and not overwrite_existing:
secho(f"File {file} already exists. Skip download.", fg="blue")
return True
try:
async with client.stream("GET", url) as r:
length = int(r.headers["Content-Length"])
progressbar = tqdm.tqdm(
desc=filename, total=length, unit='B', unit_scale=True,
unit_divisor=1024
)
try:
with progressbar:
async with aiofiles.open(tmp_file, mode="wb") as f:
#with progressbar, tmp_file.open("wb") as f:
async for chunk in r.aiter_bytes():
await f.write(chunk)
progressbar.update(len(chunk))
if file.exists() and overwrite_existing:
i = 0
while file.with_suffix(f"{file.suffix}.old.{i}").exists():
i += 1
file.rename(file.with_suffix(f"{file.suffix}.old.{i}"))
tmp_file.rename(file)
tqdm.tqdm.write(f"File {file} downloaded to {output_dir} in {r.elapsed}")
return True
finally:
# remove tmp_file if download breaks
tmp_file.unlink() if tmp_file.exists() else ""
except KeyError as e:
secho(f"An error occured during downloading {file}", fg="red")
return False
class LibraryItem:
def __init__(self, item, api_client, client):
self._data = item
self._api_client = api_client
self._client = client
def __init__(self,
data: dict,
locale: Optional[Locale] = None,
country_code: Optional[str] = None,
auth: Optional[Authenticator] = None):
if locale is None and country_code is None and auth is None:
raise ValueError("No locale, country_code or auth provided.")
if locale is not None and country_code is not None:
raise ValueError("Locale and country_code provided. Expected only "
"one of them.")
if country_code is not None:
locale = Locale(country_code)
self._data = data.get("item", data)
self._locale = locale or auth.locale
self._auth = auth
def __getitem__(self, key):
return self._data[key]
@ -81,18 +42,19 @@ class LibraryItem:
except KeyError:
return None
def __iter__(self):
return iter(self._data)
@property
def full_title(self):
return self.title + (f": {self.subtitle}" if self.subtitle else "")
title: str = self.title
if self.subtitle is not None:
title = f"{title}: {self.subtitle}"
return title
@property
def full_title_slugify(self):
valid_chars = f"-_.() " + string.ascii_letters + string.digits
cleaned_title = unicodedata.normalize('NFKD', self.full_title)\
.encode('ASCII', 'ignore').replace(b" ", b"_")
valid_chars = "-_.() " + string.ascii_letters + string.digits
cleaned_title = unicodedata.normalize("NFKD", self.full_title)
cleaned_title = cleaned_title.encode("ASCII", "ignore")
cleaned_title = cleaned_title.replace(b" ", b"_")
return "".join(chr(c) for c in cleaned_title if chr(c) in valid_chars)
def substring_in_title_accuracy(self, substring):
@ -103,76 +65,51 @@ class LibraryItem:
accuracy = self.substring_in_title_accuracy(substring)
return accuracy >= p
async def get_cover(self, output_dir, overwrite_existing=False):
url = self.product_images.get("500")
if url is None:
# TODO: no cover
def get_cover_url(self, res: Union[str, int] = 500):
images = self.product_images
res = str(res)
if images is None or res not in images:
return
return images[res]
filename = self.full_title_slugify + ".jpg"
await download_content(client=self._client, url=url,
output_dir=output_dir, filename=filename,
overwrite_existing=overwrite_existing)
def get_pdf_url(self):
if self.pdf_url is not None:
domain = self._locale.domain
return f"https://www.audible.{domain}/companion-file/{self.asin}"
@property
def has_pdf(self):
return self.pdf_url is not None
def _build_aax_request_url(self, codec: str):
url = ("https://cde-ta-g7g.amazon.com/FionaCDEServiceEngine/"
"FSDownloadContent")
params = {
"type": "AUDI",
"currentTransportMethod": "WIFI",
"key": self.asin,
"codec": codec
}
return httpx.URL(url, params=params)
async def get_pdf_url(self):
# something is broken getting with pdf url getting from api response
# missing credentials in pdf url link
# this working for me
tld = self._client.auth.locale.domain
r = await self._client.head(
f"https://www.audible.{tld}/companion-file/{self.asin}")
return r.url
async def get_pdf(self, output_dir, overwrite_existing=False):
if not self.has_pdf:
# TODO: no pdf
return
#url = self.pdf_url
url = await self.get_pdf_url()
filename = self.full_title_slugify + ".pdf"
await download_content(client=self._client, url=url,
output_dir=output_dir, filename=filename,
overwrite_existing=overwrite_existing)
async def get_download_link(self, codec):
if self._client.auth.adp_token is None:
ctx = click.get_current_context()
ctx.fail("No adp token present. Can't get download link.")
def _extract_link_from_response(self, r: httpx.Response):
# prepare link
# see https://github.com/mkb79/Audible/issues/3#issuecomment-518099852
try:
content_url = ("https://cde-ta-g7g.amazon.com/FionaCDEServiceEngine/"
"FSDownloadContent")
params = {
'type': 'AUDI',
'currentTransportMethod': 'WIFI',
'key': self.asin,
'codec': codec
}
r = await self._client.head(
url=content_url,
params=params,
allow_redirects=False)
# prepare link
# see https://github.com/mkb79/Audible/issues/3#issuecomment-518099852
link = r.headers['Location']
tld = self._client.auth.locale.domain
new_link = link.replace("cds.audible.com", f"cds.audible.{tld}")
return new_link
link = r.headers["Location"]
domain = self._locale.domain
return link.replace("cds.audible.com", f"cds.audible.{domain}")
except Exception as e:
secho(f"Error: {e} occured. Can't get download link. Skip asin {self.asin}")
return None
secho(f"Error: {e} occured. Can't get download link. "
f"Skip asin {self.asin}.")
def get_quality(self, verify=None):
"""If verify is set, ensures the given quality is present in the
codecs list. Otherwise, will find the best aax quality available
def _get_codec(self, quality: str):
"""If quality is not ``best``, ensures the given quality is present in
them codecs list. Otherwise, will find the best aax quality available
"""
assert quality in ("best", "high", "normal",)
verify = None
if quality != "best":
verify = CODEC_HIGH_QUALITY if quality == "high" else \
CODEC_NORMAL_QUALITY
best = (None, 0, 0)
for codec in self.available_codecs:
if verify is not None and verify == codec["enhanced_codec"]:
@ -201,115 +138,237 @@ class LibraryItem:
return best[0]
@property
def is_downloadable(self):
def _is_downloadable(self):
if self.content_delivery_type in ("Periodical",):
return False
return True
return True
async def get_audiobook(self, output_dir, quality="high",
overwrite_existing=False):
if not self.is_downloadable:
secho(f"{self.full_title} is not downloadable. Skip item.", fg="red")
def get_aax_url(self,
quality: str = "high",
client: Optional[httpx.Client] = None):
if not self._is_downloadable:
secho(f"{self.full_title} is not downloadable. Skip item.",
fg="red")
return
assert quality in ("best", "high", "normal",)
if quality == "best":
codec = self.get_quality()
codec = self._get_codec(quality)
url = self._build_aax_request_url(codec)
if client is None:
assert self._auth is not None
with httpx.Client(auth=self._auth) as client:
resp = client.head(url=url, allow_redirects=False)
else:
codec = self.get_quality(
CODEC_HIGH_QUALITY if quality == "high" else CODEC_NORMAL_QUALITY
)
resp = client.head(url=url, allow_redirects=False)
url = await self.get_download_link(codec)
if not url:
# TODO: no link
return self._extract_link_from_response(resp), codec
async def aget_aax_url(self,
quality: str = "high",
client: Optional[httpx.AsyncClient] = None):
if not self._is_downloadable:
secho(f"{self.full_title} is not downloadable. Skip item.",
fg="red")
return
filename = self.full_title_slugify + f"-{codec}.aax"
await download_content(client=self._client, url=url,
output_dir=output_dir, filename=filename,
overwrite_existing=overwrite_existing)
codec = self._get_codec(quality)
url = self._build_aax_request_url(codec)
if client is None:
assert self._auth is not None
async with httpx.AsyncClient(auth=self._auth) as client:
resp = await client.head(url=url, allow_redirects=False)
else:
resp = await client.head(url=url, allow_redirects=False)
async def get_audiobook_aaxc(self, output_dir, quality="high",
overwrite_existing=False):
return self._extract_link_from_response(resp), codec
@staticmethod
def _build_aaxc_request_body(quality: str):
assert quality in ("best", "high", "normal",)
body = {
"supported_drm_types" : ["Mpeg", "Adrm"],
"quality" : "Extreme" if quality in ("best", "high") else "Normal",
"consumption_type" : "Download",
"response_groups" : "last_position_heard, pdf_url, content_reference, chapter_info"
return {
"supported_drm_types": ["Mpeg", "Adrm"],
"quality": "Extreme" if quality in ("best", "high") else "Normal",
"consumption_type": "Download",
"response_groups": ("last_position_heard, pdf_url, "
"content_reference, chapter_info")
}
try:
license_response = await self._api_client.post(
f"content/{self.asin}/licenserequest",
body=body
)
except Exception as e:
raise e
url = license_response["content_license"]["content_metadata"]["content_url"]["offline_url"]
codec = license_response["content_license"]["content_metadata"]["content_reference"]["content_format"]
voucher = decrypt_voucher_from_licenserequest(self._api_client.auth, license_response)
@staticmethod
def _extract_url_from_aaxc_response(r: Dict):
return r["content_license"]["content_metadata"]["content_url"][
"offline_url"]
filename = self.full_title_slugify + f"-{codec}.aaxc"
voucher_file = (pathlib.Path(output_dir) / filename).with_suffix(".voucher")
voucher_file.write_text(json.dumps(voucher, indent=4))
tqdm.tqdm.write(f"Voucher file saved to {voucher_file}.")
@staticmethod
def _extract_codec_from_aaxc_response(r: Dict):
return r["content_license"]["content_metadata"]["content_reference"][
"content_format"]
await download_content(client=self._client, url=url,
output_dir=output_dir, filename=filename,
overwrite_existing=overwrite_existing)
@staticmethod
def _decrypt_voucher_from_aaxc_response(r: Dict, auth: Authenticator):
voucher = decrypt_voucher_from_licenserequest(auth, r)
r["content_license"]["license_response"] = voucher
return r
async def get_chapter_informations(self, output_dir, quality="high",
overwrite_existing=False):
def get_aaxc_url(self,
quality: str = "high",
api_client: Optional[audible.Client] = None):
body = self._build_aaxc_request_body(quality)
if api_client is None:
assert self._auth is not None
cc = self._locale.country_code
with audible.Client(auth=self._auth,
country_code=cc) as api_client:
lr = api_client.post(
f"content/{self.asin}/licenserequest", body=body)
else:
lr = api_client.post(f"content/{self.asin}/licenserequest",
body=body)
url = self._extract_url_from_aaxc_response(lr)
codec = self._extract_codec_from_aaxc_response(lr)
dlr = self._decrypt_voucher_from_aaxc_response(lr, api_client.auth)
return url, codec, dlr
async def aget_aaxc_url(self,
quality: str = "high",
api_client: Optional[audible.AsyncClient] = None):
body = self._build_aaxc_request_body(quality)
if api_client is None:
assert self._auth is not None
cc = self._locale.country_code
async with audible.AsyncClient(auth=self._auth,
country_code=cc) as api_client:
lr = await api_client.post(
f"content/{self.asin}/licenserequest", body=body)
else:
lr = await api_client.post(f"content/{self.asin}/licenserequest",
body=body)
url = self._extract_url_from_aaxc_response(lr)
codec = self._extract_codec_from_aaxc_response(lr)
dlr = self._decrypt_voucher_from_aaxc_response(lr, api_client.auth)
return url, codec, dlr
def _build_metadata_request_url(self, quality: str):
assert quality in ("best", "high", "normal",)
url = f"content/{self.asin}/metadata"
params = {
"response_groups": "last_position_heard, content_reference, "
"chapter_info",
"quality": "Extreme" if quality in ("best", "high") else "Normal",
"drm_type": "Adrm"
}
return str(httpx.URL(url, params=params))
filename = self.full_title_slugify + "-chapters.json"
output_dir = pathlib.Path(output_dir)
def get_content_metadata(self,
quality: str = "high",
api_client: Optional[audible.Client] = None):
if not output_dir.is_dir():
raise Exception("Output dir doesn't exists")
url = self._build_metadata_request_url(quality)
if api_client is None:
assert self._auth is not None
cc = self._locale.country_code
with audible.Client(auth=self._auth,
country_code=cc) as api_client:
metadata = api_client.get(url)
else:
metadata = api_client.get(url)
file = output_dir / filename
if file.exists() and not overwrite_existing:
secho(f"File {file} already exists. Skip saving chapters.", fg="blue")
return True
return metadata
try:
chapter_informations = await self._api_client.get(
f"content/{self.asin}/metadata",
response_groups="chapter_info",
quality="Extreme" if quality in ("best", "high") else "Normal",
drm_type="Adrm"
)
except Exception as e:
raise e
async def aget_content_metadata(self,
quality: str = "high",
api_client: Optional[
audible.AsyncClient] = None):
file.write_text(json.dumps(chapter_informations, indent=4))
tqdm.tqdm.write(f"Chapter file saved to {file}.")
url = self._build_metadata_request_url(quality)
if api_client is None:
assert self._auth is not None
cc = self._locale.country_code
async with audible.AsyncClient(auth=self._auth,
country_code=cc) as api_client:
metadata = await api_client.get(url)
else:
metadata = await api_client.get(url)
return metadata
class Library:
def __init__(self, library, api_client):
self._api_client = api_client
self._client = httpx.AsyncClient(timeout=CLIENT_TIMEOUT,
auth=api_client.auth)
def __init__(self,
data: dict,
locale: Optional[Locale] = None,
country_code: Optional[str] = None,
auth: Optional[Authenticator] = None):
self._data = [LibraryItem(i, self._api_client, self._client) \
for i in library.get("items") or library]
if locale is None and country_code is None and auth is None:
raise ValueError("No locale, country_code or auth provided.")
if locale is not None and country_code is not None:
raise ValueError("Locale and country_code provided. Expected only "
"one of them.")
locale = Locale(country_code) if country_code else locale
self._locale = locale or auth.locale
self._auth = auth
self._data = [LibraryItem(i, locale=self._locale, auth=self._auth)
for i in data.get("items", data)]
def __iter__(self):
return iter(self._data)
@classmethod
async def get_from_api(cls, auth, **params):
api_client = AsyncClient(auth, timeout=CLIENT_TIMEOUT)
async with api_client as client:
library = await client.get("library", params=params)
def get_from_api(cls,
api_client: audible.Client,
locale: Optional[Locale] = None,
country_code: Optional[str] = None,
close_session: bool = False,
**request_params):
return cls(library, api_client)
if locale is not None and country_code is not None:
raise ValueError("Locale and country_code provided. Expected only "
"one of them.")
locale = Locale(country_code) if country_code else locale
if locale:
api_client.locale = locale
if close_session:
with api_client:
resp = api_client.get("library", params=request_params)
else:
resp = api_client.get("library", params=request_params)
return cls(resp, auth=api_client.auth)
@classmethod
async def aget_from_api(cls,
api_client: audible.AsyncClient,
locale: Optional[Locale] = None,
country_code: Optional[str] = None,
close_session: bool = False,
**request_params):
if locale is not None and country_code is not None:
raise ValueError("Locale and country_code provided. Expected only "
"one of them.")
locale = Locale(country_code) if country_code else locale
if locale:
api_client.locale = locale
if close_session:
async with api_client:
resp = await api_client.get("library", params=request_params)
else:
resp = await api_client.get("library", params=request_params)
return cls(resp, auth=api_client.auth)
def get_item_by_asin(self, asin):
try:
@ -326,4 +385,4 @@ class Library:
accuracy = i.substring_in_title_accuracy(search_title)
match.append([i, accuracy]) if accuracy >= p else ""
return match
return match

View file

@ -9,7 +9,6 @@ from .config import (
set_config
)
cli_config_option = click.option(
"--config",
"-c",
@ -55,4 +54,4 @@ auth_file_password_option = click.option(
"-p",
callback=add_param_to_session,
expose_value=False,
help="The password for the profile auth file.")
help="The password for the profile auth file.")

View file

@ -1,13 +1,17 @@
import asyncio
import io
import pathlib
from difflib import SequenceMatcher
from functools import partial, wraps
from typing import Optional, Union
import aiofiles
import click
import httpx
import tqdm
from PIL import Image
from audible import Authenticator
from click import echo, secho, prompt
from PIL import Image
from .constants import DEFAULT_AUTH_FILE_ENCRYPTION
@ -95,7 +99,7 @@ class LongestSubString:
@property
def percentage(self):
return (self._match.size / len(self._search_for) * 100)
return self._match.size / len(self._search_for) * 100
def asin_in_library(asin, library):
@ -104,4 +108,134 @@ def asin_in_library(asin, library):
try:
return next(i for i in items if asin in i["asin"])
except StopIteration:
return False
return False
def wrap_async(func):
@wraps(func)
async def run(*args, loop=None, executor=None, **kwargs):
if loop is None:
loop = asyncio.get_event_loop()
pfunc = partial(func, *args, **kwargs)
return await loop.run_in_executor(executor, pfunc)
return run
class DummyProgressBar:
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
pass
def update(self, *args, **kwargs):
pass
class Downloader:
def __init__(self, url, file, client, overwrite_existing):
self._url = url
self._file = pathlib.Path(file).resolve()
self._tmp_file = self._file.with_suffix(".tmp")
self._client = client
self._overwrite_existing = overwrite_existing
def _progressbar(self, total: int):
return tqdm.tqdm(desc=str(self._file), total=total, unit="B",
unit_scale=True, unit_divisor=1024)
def _file_okay(self):
if not self._file.parent.is_dir():
secho(f"Folder {self._file.parent} doesn't exists! Skip download.",
fg="red")
return False
if self._file.exists() and not self._file.is_file():
secho(f"Object {self._file} exists but is no file. Skip download.",
fg="red")
return False
if self._file.is_file() and not self._overwrite_existing:
secho(f"File {self._file} already exists. Skip download.",
fg="blue")
return False
return True
def _postpare(self, elapsed):
file = self._file
tmp_file = self._tmp_file
if file.exists() and self._overwrite_existing:
i = 0
while file.with_suffix(f"{file.suffix}.old.{i}").exists():
i += 1
file.rename(file.with_suffix(f"{file.suffix}.old.{i}"))
tmp_file.rename(file)
tqdm.tqdm.write(f"File {self._file} downloaded to {self._file.parent} "
f"in {elapsed}.")
def _remove_tmp_file(self):
self._tmp_file.unlink() if self._tmp_file.exists() else None
def _stream_load(self, pb: bool = True):
with self._client.stream("GET", self._url) as r:
length = r.headers.get("Content-Length")
progressbar = self._progressbar(int(length)) if length and pb \
else DummyProgressBar()
with progressbar, open(self._tmp_file, mode="wb") as f:
for chunk in r.iter_bytes():
f.write(chunk)
progressbar.update(len(chunk))
self._postpare(r.elapsed)
return True
def _load(self):
r = self._client.get(self._url)
with open(self._tmp_file, mode="wb") as f:
f.write(r.content)
self._postpare(r.elapsed)
return True
async def _astream_load(self, pb: bool = True):
async with self._client.stream("GET", self._url) as r:
length = r.headers.get("Content-Length")
progressbar = self._progressbar(int(length)) if length and pb \
else DummyProgressBar()
with progressbar:
async with aiofiles.open(self._tmp_file, mode="wb") as f:
async for chunk in r.aiter_bytes():
await f.write(chunk)
progressbar.update(len(chunk))
self._postpare(r.elapsed)
return True
async def _aload(self):
r = await self._client.get(self._url)
async with aiofiles.open(self._tmp_file, mode="wb") as f:
await f.write(r.content)
self._postpare(r.elapsed)
return True
def run(self, stream: bool = True, pb: bool = True):
if not self._file_okay():
return
try:
return self._stream_load(pb) if stream else self._load()
finally:
self._remove_tmp_file()
async def arun(self, stream: bool = True, pb: bool = True):
if not self._file_okay():
return
try:
return await self._astream_load(pb) if stream else \
await self._aload()
finally:
self._remove_tmp_file()

View file

@ -50,11 +50,11 @@ class ApiMeta:
return len(self.get_chapters())
def get_chapters(self):
return self._meta_parsed["content_metadata"]["chapter_info"]["chapters"]
return self._meta_parsed["content_metadata"]["chapter_info"][
"chapters"]
class FFMeta:
SECTION = re.compile(r"\[(?P<header>[^]]+)\]")
OPTION = re.compile(r"(?P<option>.*?)\s*(?:(?P<vi>=)\s*(?P<value>.*))?$")
@ -71,7 +71,7 @@ class FFMeta:
for line in iter(self._ffmeta_raw.splitlines()):
mo = self.SECTION.match(line)
if mo:
sec_name = mo.group('header')
sec_name = mo.group("header")
if sec_name == "CHAPTER":
num_chap += 1
if sec_name not in parsed_dict:
@ -104,12 +104,15 @@ class FFMeta:
elif section == "CHAPTER":
# TODO: Tue etwas
for chapter in self._ffmeta_parsed[section]:
self._write_section(fp, section, self._ffmeta_parsed[section][chapter], d)
self._write_section(fp, section,
self._ffmeta_parsed[section][chapter],
d)
else:
self._write_section(fp, section, self._ffmeta_parsed[section], d)
self._write_section(fp, section, self._ffmeta_parsed[section],
d)
def _write_section(self, fp, section_name, section_items, delimiter):
"""Write a single section to the specified `fp'."""
"""Write a single section to the specified `fp`."""
if section_name is not None:
fp.write(f"[{section_name}]\n")
@ -135,30 +138,31 @@ class FFMeta:
self.set_chapter_option(num_chap, "title", value)
CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help'])
CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"])
@click.command(context_settings=CONTEXT_SETTINGS)
@click.option(
"--ffmeta", "-f",
type=click.Path(
exists=True, file_okay=True, readable=True
),
required=True,
help="ffmetadata file extracted from file with ffmpeg"
"--ffmeta", "-f",
type=click.Path(
exists=True, file_okay=True, readable=True
),
required=True,
help="ffmetadata file extracted from file with ffmpeg"
)
@click.option(
"--apimeta", "-a",
type=click.Path(
exists=True, file_okay=True, readable=True
),
required=True,
help="metadata from api"
"--apimeta", "-a",
type=click.Path(
exists=True, file_okay=True, readable=True
),
required=True,
help="metadata from api"
)
@click.option(
"--outfile", "-o",
type=click.Path(exists=False, file_okay=True),
required=True,
help="filename to store prepared ffmeta"
"--outfile", "-o",
type=click.Path(exists=False, file_okay=True),
required=True,
help="filename to store prepared ffmeta"
)
def cli(ffmeta, apimeta, outfile):
ffmeta_class = FFMeta(ffmeta)
@ -171,7 +175,8 @@ def main(*args, **kwargs):
try:
cli(*args, **kwargs)
except KeyboardInterrupt:
sys.exit('\nERROR: Interrupted by user')
sys.exit("\nERROR: Interrupted by user")
if __name__ == "__main__":
sys.exit(main())