Compare commits

...

55 commits

Author SHA1 Message Date
mkb79
d1beda664a Go to beta stage 2022-05-17 08:27:25 +02:00
mkb79
70af33a258 Update CHANGELOG.md 2022-05-09 19:49:49 +02:00
Isaac Lyons
629a6ef171
Download fallback (#84)
* add fallback to aaxc if aax is not supported

* update exceptions.py

* update cmd_download.py

* remove whitespaces

* update cmd_download.py

Co-authored-by: mkb79 <mkb79@hackitall.de>
2022-05-08 22:34:39 +02:00
mkb79
d5d5f3985b
check aaxc files if they are downloadable 2022-05-06 13:28:29 +02:00
mkb79
1d0972b830
Shortened log message for downloaded files
Thanks goes to snowskeleton and his
[commit](327dc50898
db84a2775c9b3d482ab61d011ed90)
2022-05-06 08:50:07 +02:00
mkb79
3839a026e8 Update CHANGELOG and README
Changes in `config.py` and `models.py` will be documented next.
2022-05-04 16:10:40 +02:00
mkb79
1a42f1c644 Bump dev to alpha 2022-05-04 12:38:01 +02:00
mkb79
6bc2b6797f Update update_chapter_titles.py 2022-05-04 12:35:43 +02:00
mkb79
1dc419868d Update utils.py 2022-05-04 12:35:39 +02:00
mkb79
08609d6ee2 Update code completion guide 2022-05-04 12:35:30 +02:00
mkb79
f6a45c2998 Update files in plugin_cmds 2022-05-04 12:34:55 +02:00
mkb79
8ee6fc810b
add --resolve-podcasts option to library export and list command 2022-05-03 11:02:36 +02:00
mkb79
87319862a6
config.py: fix typo 2022-05-03 11:00:21 +02:00
mkb79
194545e2d0 Update cmd_download.py 2022-05-02 22:11:17 +02:00
mkb79
04fe4c2254 cmd_manage.py: fix typo 2022-05-02 22:11:01 +02:00
mkb79
e62c0cbf29 cmd_quickstart.py: fix typo 2022-05-02 22:10:36 +02:00
mkb79
fe4bc080e0 Update models.py
A full api sync will now fetch page 1 and extract the total count headers. Now outstanding pages will be counted and requested at once.
2022-05-02 22:03:08 +02:00
mkb79
fa8012ec4d decorators.py: fix typo 2022-05-02 20:15:48 +02:00
mkb79
0fb1de2ce9 fix shadow from outer scope 2022-05-02 20:13:47 +02:00
mkb79
c293afb883 decorators.py: fix PEP 8: E501 2022-05-02 20:10:46 +02:00
mkb79
ea7226a8b8 config.py: fix typo 2022-05-02 20:06:27 +02:00
mkb79
b26ef99332 config.py: fix PEP 8: E125 2022-05-02 20:02:16 +02:00
mkb79
35fa35614d
fix a bug in csv output #79 2022-04-29 12:51:30 +02:00
mkb79
4bd2287222
update example cmd_get-annotations.py 2022-04-26 14:48:28 +02:00
mkb79
7cc5e6a4c4
add flag --annotation to download command
Downloads the annotations (bookmarks, notes, clips and last heard) to
json file
2022-04-26 09:40:11 +02:00
mkb79
37c582ce70
add LibraryItem.get_annotations method 2022-04-26 09:38:00 +02:00
mkb79
8c7a2382d2
rework cmd_wishlist.py 2022-04-25 12:56:52 +02:00
mkb79
0731e54184
rework cmd_library.py 2022-04-25 12:56:39 +02:00
mkb79
c54ea7416f
move wrap_async from utils.py to decorators.py 2022-04-25 12:54:24 +02:00
mkb79
ec09d05825
fix SyntaxWarning in decorators.py 2022-04-22 22:33:42 +02:00
mkb79
75f832c821
rework cmd_download.py
- when using option `--title/-t` now a checkbox appears where one or
more items can be selected; using `no-confirm` to ignore these
checkbox and select all found matches
2022-04-22 14:52:48 +02:00
mkb79
b6993ecce8
rework cmd_wishlist.py 2022-04-22 14:49:44 +02:00
mkb79
8a6f3edcb8
rework run_async and add pass_client in decorators.py 2022-04-22 14:46:23 +02:00
mkb79
ee70469cac
update cmd_wishlist.py 2022-04-21 15:34:08 +02:00
mkb79
47ba6b7dd8
bump dev version 2022-04-21 12:44:15 +02:00
mkb79
eabd0f7a43
add questionary to dependiencies 2022-04-21 12:43:52 +02:00
mkb79
72c45f5225
set default timeout to 5s when using get_client 2022-04-20 15:33:36 +02:00
mkb79
8e5f4a7a52
rework cmd_wishlist.py
- add `add` and `remove` subcommand to wishlist
2022-04-20 15:14:37 +02:00
mkb79
eaaf68e4d3
bump audible to at least 0.8.1 2022-04-20 15:11:19 +02:00
mkb79
f7562246a5
add utils.full_response_callback 2022-04-20 15:10:30 +02:00
mkb79
0998eb773d
fix a bug with partially asins 2022-04-20 15:09:58 +02:00
mkb79
f0cd65af2f
Add support for optional decorator parentheses 2022-04-20 13:47:32 +02:00
mkb79
ddae5f6707
rework package
Doc about changes will be written later
2022-04-14 17:43:54 +02:00
mkb79
b06426ae57
Bugfix httpx version requirement 2022-04-11 15:45:20 +02:00
mkb79
1cc48ba06d
bump Audible to v0.8.0 2022-04-11 15:24:49 +02:00
mkb79
34a01f9084
Merge branch origin/master into development 2022-04-11 15:19:54 +02:00
mkb79
e29f66ed1d
Merge branch origin/master into development 2022-03-22 21:16:33 +01:00
mkb79
087eafe582
numbering title found for download command
I will add a selector in feature commit
2022-03-18 08:29:50 +01:00
mkb79
ec0e6d5165
add cmd_goodreads-transform.py to example plugin cmds 2022-03-18 08:28:13 +01:00
mkb79
0668c48e31
download command now uses the same client for API requests and downloads 2022-03-17 09:51:28 +01:00
mkb79
5398e55fd2
add more docstrings to config.py 2022-03-17 07:31:13 +01:00
mkb79
6a6e0e10f2
Move base filename 2022-03-16 18:46:16 +01:00
mkb79
e85d60055d
rework config.py 2022-03-16 15:45:41 +01:00
mkb79
2c277f0748
add docstrings to config.ConfigFile class 2022-03-16 13:58:28 +01:00
mkb79
0fe30b2ea2 some changes on the config class 2022-03-15 22:18:01 +01:00
29 changed files with 1587 additions and 1005 deletions

View file

@ -6,7 +6,41 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
## Unreleased
-
### Added
- `--aax-fallback` option to `download` command to download books in aax format and fallback to aaxc, if the book is not available as aax
- `--annotation` option to `download` command to get bookmarks and notes
- `questionary` package to dependencies
- `add` and `remove` subcommands to wishlist
- `full_response_callback` to `utils`
- `export_to_csv` to `utils`
- `run_async` to `decorators`
- `pass_client` to `decorators`
- `profile_option` to `decorators`
- `password_option` to `decorators`
- `timeout_option` to `decorators`
- `bunch_size_option` to `decorators`
- `ConfigFile.get_profile_option` get the value for an option for a given profile
- `Session.selected.profile` to get the profile name for the current session
- `Session.get_auth_for_profile` to get an auth file for a given profile
- `models.BaseItem.create_base_filename` to build a filename in given mode
- `models.LibraryItem.get_annotations` to get annotations for a library item
### Changed
- bump `audible` to v0.8.1
- rework plugin examples in `plugin_cmds`
- rename `config.Config` to `config.ConfigFile`
- move `click_verbosity_logger` from `_logging` to `decorators` and rename it to `verbosity_option`
- move `wrap_async` from `utils` to `decorators`
- move `add_param_to_session` from `config` to `decorators`
- move `pass_session` from `config` to `decorators`
- `download` command let you now select items when using `--title` option
### Fixed
- the `library export` and `wishlist export` command will now export to `csv` correctly
-
## [0.1.3] - 2022-03-27
@ -25,7 +59,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
### Added
- the `--version` option now checks if an update for `audible-cli` is available
- build macOS release in onedir mode
- build macOS releases in onedir mode
### Bugfix

View file

@ -188,6 +188,8 @@ At this time, there the following buildin subcommands:
- `wishlist`
- `export`
- `list`
- `add`
- `remove`
## Verbosity option

View file

@ -0,0 +1,21 @@
import click
from audible.exceptions import NotFoundError
from audible_cli.decorators import pass_client
@click.command("get-annotations")
@click.argument("asin")
@pass_client
async def cli(client, asin):
url = f"https://cde-ta-g7g.amazon.com/FionaCDEServiceEngine/sidecar"
params = {
"type": "AUDI",
"key": asin
}
try:
r = await client.get(url, params=params)
except NotFoundError:
click.echo(f"No annotations found for asin {asin}")
else:
click.echo(r)

View file

@ -0,0 +1,110 @@
import logging
import pathlib
from datetime import datetime, timezone
import click
from audible_cli.decorators import (
bunch_size_option,
timeout_option,
pass_client,
pass_session
)
from audible_cli.models import Library
from audible_cli.utils import export_to_csv
from isbntools.app import isbn_from_words
logger = logging.getLogger("audible_cli.cmds.cmd_goodreads-transform")
@click.command("goodreads-transform")
@click.option(
"--output", "-o",
type=click.Path(path_type=pathlib.Path),
default=pathlib.Path().cwd() / "library.csv",
show_default=True,
help="output file"
)
@timeout_option
@bunch_size_option
@pass_session
@pass_client
async def cli(session, client, output):
"""YOUR COMMAND DESCRIPTION"""
logger.debug("fetching library")
bunch_size = session.params.get("bunch_size")
library = await Library.from_api_full_sync(
client,
response_groups=(
"product_details, contributors, is_finished, product_desc"
),
bunch_size=bunch_size
)
logger.debug("prepare library")
library = _prepare_library_for_export(library)
logger.debug("write data rows to file")
headers = ("isbn", "Date Added", "Date Read", "Title")
export_to_csv(
file=output,
data=library,
headers=headers,
dialect="excel"
)
logger.info(f"File saved to {output}")
def _prepare_library_for_export(library):
prepared_library = []
isbn_counter = 0
isbn_api_counter = 0
isbn_no_result_counter = 0
skipped_items = 0
for i in library:
title = i.title
authors = i.authors
if authors is not None:
authors = ", ".join([a["name"] for a in authors])
is_finished = i.is_finished
isbn = i.isbn
if isbn is None:
isbn_counter += 1
isbn = isbn_from_words(f"{title} {authors}") or None
if isbn is None:
isbn_no_result_counter += 1
else:
isbn_api_counter += 1
date_added = i.library_status
if date_added is not None:
date_added = date_added["date_added"]
date_added = datetime.strptime(
date_added, '%Y-%m-%dT%H:%M:%S.%fZ'
).replace(tzinfo=timezone.utc).astimezone()
date_added = date_added.astimezone().date().isoformat()
date_read = None
if is_finished:
date_read = date_added
if isbn and date_read:
data_row = [isbn, date_added, date_read, title]
prepared_library.append(data_row)
else:
skipped_items += 1
logger.debug(f"ISBNs from API: {isbn_api_counter}")
logger.debug(f"ISBNs requested with isbntools: {isbn_counter}")
logger.debug(f"No result with isbntools: {isbn_no_result_counter}")
logger.debug(
f"title skipped from file due to no ISBN or title not read: "
f"{skipped_items}")
return prepared_library

View file

@ -1,25 +1,19 @@
import audible
import click
from audible_cli.config import pass_session
from audible_cli.decorators import pass_client, timeout_option
@click.command("get-cover-urls")
@click.option(
"--asin", "-a",
multiple=False,
help="asin of the audiobook"
)
@pass_session
def cli(session, asin):
"Print out the image urls for different resolutions for a book"
with audible.Client(auth=session.auth) as client:
r = client.get(
f"catalog/products/{asin}",
response_groups="media",
image_sizes=("1215, 408, 360, 882, 315, 570, 252, "
"558, 900, 500")
)
@click.command("image-urls")
@click.argument("asin")
@timeout_option()
@pass_client()
async def cli(client, asin):
"""Print out the image urls for different resolutions for a book"""
r = await client.get(
f"catalog/products/{asin}",
response_groups="media",
image_sizes=(
"1215, 408, 360, 882, 315, 570, 252, 558, 900, 500")
)
images = r["product"]["product_images"]
for res, url in images.items():
click.echo(f"Resolution {res}: {url}")

View file

@ -4,9 +4,8 @@ import logging
import pathlib
from datetime import datetime
import audible
import click
from audible_cli.config import pass_session
from audible_cli.decorators import pass_client
logger = logging.getLogger("audible_cli.cmds.cmd_listening-stats")
@ -15,10 +14,10 @@ current_year = datetime.now().year
def ms_to_hms(milliseconds):
seconds = (int) (milliseconds / 1000) % 60
minutes = (int) ((milliseconds / (1000*60)) % 60)
hours = (int) ((milliseconds / (1000*60*60)) % 24)
return hours, minutes, seconds
seconds = int((milliseconds / 1000) % 60)
minutes = int(((milliseconds / (1000*60)) % 60))
hours = int(((milliseconds / (1000*60*60)) % 24))
return {"hours": hours, "minutes": minutes, "seconds": seconds}
async def _get_stats_year(client, year):
@ -29,30 +28,12 @@ async def _get_stats_year(client, year):
monthly_listening_interval_start_date=f"{year}-01",
store="Audible"
)
#iterate over each month
# iterate over each month
for stat in stats['aggregated_monthly_listening_stats']:
stats_year[stat["interval_identifier"]] = ms_to_hms(stat["aggregated_sum"])
return stats_year
async def _listening_stats(auth, output, signup_year):
year_range = [y for y in range(signup_year, current_year+1)]
async with audible.AsyncClient(auth=auth) as client:
r = await asyncio.gather(
*[_get_stats_year(client, y) for y in year_range]
)
aggreated_stats = {}
for i in r:
for k, v in i.items():
aggreated_stats[k] = v
aggreated_stats = json.dumps(aggreated_stats, indent=4)
output.write_text(aggreated_stats)
@click.command("listening-stats")
@click.option(
"--output", "-o",
@ -68,15 +49,19 @@ async def _listening_stats(auth, output, signup_year):
show_default=True,
help="start year for collecting listening stats"
)
@pass_session
def cli(session, output, signup_year):
@pass_client
async def cli(client, output, signup_year):
"""get and analyse listening statistics"""
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(
_listening_stats(session.auth, output, signup_year)
)
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
year_range = [y for y in range(signup_year, current_year+1)]
r = await asyncio.gather(
*[_get_stats_year(client, y) for y in year_range]
)
aggregated_stats = {}
for i in r:
for k, v in i.items():
aggregated_stats[k] = v
aggregated_stats = json.dumps(aggregated_stats, indent=4)
output.write_text(aggregated_stats)

View file

@ -2,7 +2,7 @@
This is a proof-of-concept and for testing purposes only. No error handling.
Need further work. Some options does not work or options are missing.
Needs at least ffmpeg 4.1 with aaxc patch.
Needs at least ffmpeg 4.4
"""
@ -14,7 +14,7 @@ import subprocess
from shutil import which
import click
from audible_cli.config import pass_session
from audible_cli.decorators import pass_session
from click import echo, secho
@ -49,6 +49,10 @@ class ApiMeta:
return self._meta_parsed["content_metadata"]["chapter_info"][
"runtime_length_ms"]
def is_accurate(self):
return self._meta_parsed["content_metadata"]["chapter_info"][
"is_accurate"]
class FFMeta:
SECTION = re.compile(r"\[(?P<header>[^]]+)\]")
@ -107,7 +111,8 @@ class FFMeta:
self._write_section(fp, section, self._ffmeta_parsed[section],
d)
def _write_section(self, fp, section_name, section_items, delimiter):
@staticmethod
def _write_section(fp, section_name, section_items, delimiter):
"""Write a single section to the specified `fp`."""
if section_name is not None:
fp.write(f"[{section_name}]\n")
@ -122,6 +127,10 @@ class FFMeta:
if not isinstance(api_meta, ApiMeta):
api_meta = ApiMeta(api_meta)
if not api_meta.is_accurate():
echo("Metadata from API is not accurate. Skip.")
return
# assert api_meta.count_chapters() == self.count_chapters()
echo(f"Found {self.count_chapters()} chapters to prepare.")
@ -170,45 +179,54 @@ class FFMeta:
self._ffmeta_parsed["CHAPTER"] = new_chapters
def decrypt_aax(files, session):
def decrypt_aax(files, activation_bytes, rebuild_chapters):
for file in files:
outfile = file.with_suffix(".m4b")
metafile = file.with_suffix(".meta")
metafile_new = file.with_suffix(".new.meta")
# apimeta = CHAPTERFILE
base_filename = file.stem.rsplit("-")[0]
chapters = file.with_name(base_filename + "-chapters").with_suffix(".json")
apimeta = json.loads(chapters.read_text())
if outfile.exists():
secho(f"file {outfile} already exists Skip.", fg="blue")
continue
ab = session.auth.activation_bytes
cmd = ["ffmpeg",
"-activation_bytes", ab,
"-i", str(file),
"-f", "ffmetadata",
str(metafile)]
subprocess.check_output(cmd, universal_newlines=True)
ffmeta_class = FFMeta(metafile)
#ffmeta_class.update_chapters_from_api_meta(apimeta)
ffmeta_class.write(metafile_new)
click.echo("Replaced all titles.")
cmd = ["ffmpeg",
"-activation_bytes", ab,
"-i", str(file),
"-i", str(metafile_new),
"-map_metadata", "0",
"-map_chapters", "1",
"-c", "copy",
str(outfile)]
subprocess.check_output(cmd, universal_newlines=True)
metafile.unlink()
metafile_new.unlink()
if rebuild_chapters and apimeta["content_metadata"]["chapter_info"][
"is_accurate"]:
cmd = ["ffmpeg",
"-activation_bytes", activation_bytes,
"-i", str(file),
"-f", "ffmetadata",
str(metafile)]
subprocess.check_output(cmd, universal_newlines=True)
ffmeta_class = FFMeta(metafile)
ffmeta_class.update_chapters_from_api_meta(apimeta)
ffmeta_class.write(metafile_new)
click.echo("Replaced all titles.")
cmd = ["ffmpeg",
"-activation_bytes", activation_bytes,
"-i", str(file),
"-i", str(metafile_new),
"-map_metadata", "0",
"-map_chapters", "1",
"-c", "copy",
str(outfile)]
subprocess.check_output(cmd, universal_newlines=True)
metafile.unlink()
metafile_new.unlink()
else:
cmd = ["ffmpeg",
"-activation_bytes", activation_bytes,
"-i", str(file),
"-c", "copy",
str(outfile)]
subprocess.check_output(cmd, universal_newlines=True)
def decrypt_aaxc(files, session):
def decrypt_aaxc(files, rebuild_chapters):
for file in files:
metafile = file.with_suffix(".meta")
metafile_new = file.with_suffix(".new.meta")
@ -223,32 +241,42 @@ def decrypt_aaxc(files, session):
apimeta = voucher["content_license"]
audible_key = apimeta["license_response"]["key"]
audible_iv = apimeta["license_response"]["iv"]
if rebuild_chapters and apimeta["content_metadata"]["chapter_info"][
"is_accurate"]:
cmd = ["ffmpeg",
"-audible_key", audible_key,
"-audible_iv", audible_iv,
"-i", str(file),
"-f", "ffmetadata",
str(metafile)]
subprocess.check_output(cmd, universal_newlines=True)
cmd = ["ffmpeg",
"-audible_key", audible_key,
"-audible_iv", audible_iv,
"-i", str(file),
"-f", "ffmetadata",
str(metafile)]
subprocess.check_output(cmd, universal_newlines=True)
ffmeta_class = FFMeta(metafile)
ffmeta_class.update_chapters_from_api_meta(apimeta)
ffmeta_class.write(metafile_new)
click.echo("Replaced all titles.")
ffmeta_class = FFMeta(metafile)
ffmeta_class.update_chapters_from_api_meta(apimeta)
ffmeta_class.write(metafile_new)
click.echo("Replaced all titles.")
cmd = ["ffmpeg",
"-audible_key", audible_key,
"-audible_iv", audible_iv,
"-i", str(file),
"-i", str(metafile_new),
"-map_metadata", "0",
"-map_chapters", "1",
"-c", "copy",
str(outfile)]
subprocess.check_output(cmd, universal_newlines=True)
metafile.unlink()
metafile_new.unlink()
cmd = ["ffmpeg",
"-audible_key", audible_key,
"-audible_iv", audible_iv,
"-i", str(file),
"-i", str(metafile_new),
"-map_metadata", "0",
"-map_chapters", "1",
"-c", "copy",
str(outfile)]
subprocess.check_output(cmd, universal_newlines=True)
metafile.unlink()
metafile_new.unlink()
else:
cmd = ["ffmpeg",
"-audible_key", audible_key,
"-audible_iv", audible_iv,
"-i", str(file),
"-c", "copy",
str(outfile)]
subprocess.check_output(cmd, universal_newlines=True)
CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"])
@ -270,21 +298,25 @@ CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"])
is_flag=True,
help="overwrite existing files"
)
@click.option(
"--rebuild-chapters",
is_flag=True,
help="Rebuild chapters from chapter file"
)
@pass_session
def cli(session, **options):
if not which("ffmpeg"):
ctx = click.get_current_context()
ctx.fail("ffmpeg not found")
rebuild_chapters = options.get("rebuild_chapters")
jobs = {"aaxc": [], "aax":[]}
if options.get("all"):
cwd = pathlib.Path.cwd()
jobs["aaxc"].extend(list(cwd.glob('*.aaxc')))
jobs["aax"].extend(list(cwd.glob('*.aax')))
for suffix in jobs:
for i in jobs[suffix]:
i = i.resolve()
else:
for file in options.get("input"):
@ -296,6 +328,5 @@ def cli(session, **options):
else:
secho(f"file suffix {file.suffix} not supported", fg="red")
decrypt_aaxc(jobs["aaxc"], session)
decrypt_aax(jobs["aax"], session)
decrypt_aaxc(jobs["aaxc"], rebuild_chapters)
decrypt_aax(jobs["aax"], session.auth.activation_bytes, rebuild_chapters)

View file

@ -46,15 +46,16 @@ setup(
],
install_requires=[
"aiofiles",
"audible==0.7.2",
"audible>=0.8.1",
"click>=8",
"colorama; platform_system=='Windows'",
"httpx>=0.20.*,<=0.22.*",
"httpx>=0.20.0,<0.23.0",
"packaging",
"Pillow",
"tabulate",
"toml",
"tqdm"
"tqdm",
"questionary"
],
extras_require={
'pyi': [

View file

@ -73,42 +73,6 @@ log_helper = AudibleCliLogHelper()
# copied from https://github.com/Toilal/click-logging
def click_verbosity_option(logger=None, *names, **kwargs):
"""A decorator that adds a `--verbosity, -v` option to the decorated
command.
Name can be configured through ``*names``. Keyword arguments are passed to
the underlying ``click.option`` decorator.
"""
if not names:
names = ["--verbosity", "-v"]
kwargs.setdefault("default", "INFO")
kwargs.setdefault("metavar", "LVL")
kwargs.setdefault("expose_value", False)
kwargs.setdefault(
"help", "Either CRITICAL, ERROR, WARNING, "
"INFO or DEBUG. [default: INFO]"
)
kwargs.setdefault("is_eager", True)
logger = _normalize_logger(logger)
def decorator(f):
def _set_level(ctx, param, value):
x = getattr(logging, value.upper(), None)
if x is None:
raise click.BadParameter(
f"Must be CRITICAL, ERROR, WARNING, INFO or DEBUG, "
f"not {value}"
)
logger.setLevel(x)
return click.option(*names, callback=_set_level, **kwargs)(f)
return decorator
class ColorFormatter(logging.Formatter):
def __init__(self, style_kwargs):
self.style_kwargs = style_kwargs

View file

@ -1,7 +1,7 @@
__title__ = "audible-cli"
__description__ = "Command line interface (cli) for the audible package."
__url__ = "https://github.com/mkb79/audible-cli"
__version__ = "0.1.3"
__version__ = "0.2.b1"
__author__ = "mkb79"
__author_email__ = "mkb79@hackitall.de"
__license__ = "AGPL"

View file

@ -3,18 +3,19 @@ import sys
from pkg_resources import iter_entry_points
import click
import httpx
from packaging.version import parse
from .cmds import build_in_cmds, cmd_quickstart
from .config import (
get_plugin_dir,
add_param_to_session
)
from .config import get_plugin_dir
from .constants import PLUGIN_ENTRY_POINT
from .decorators import (
password_option,
profile_option,
verbosity_option,
version_option
)
from .exceptions import AudibleCliException
from ._logging import click_basic_config, click_verbosity_option
from . import __version__, plugins
from ._logging import click_basic_config
from . import plugins
logger = logging.getLogger("audible_cli")
@ -23,77 +24,22 @@ click_basic_config(logger)
CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"])
def version_option(**kwargs):
def callback(ctx, param, value):
if not value or ctx.resilient_parsing:
return
message = f"audible-cli, version {__version__}"
click.echo(message, color=ctx.color, nl=False)
url = "https://api.github.com/repos/mkb79/audible-cli/releases/latest"
headers = {"Accept": "application/vnd.github.v3+json"}
logger.debug(f"Requesting Github API for latest release information")
try:
response = httpx.get(url, headers=headers, follow_redirects=True)
response.raise_for_status()
except Exception as e:
logger.error(e)
click.Abort()
content = response.json()
current_version = parse(__version__)
latest_version = parse(content["tag_name"])
html_url = content["html_url"]
if latest_version > current_version:
click.echo(
f" (update available)\nVisit {html_url} "
f"for information about the new release.",
color=ctx.color
)
else:
click.echo(" (up-to-date)", color=ctx.color)
ctx.exit()
kwargs.setdefault("is_flag", True)
kwargs.setdefault("expose_value", False)
kwargs.setdefault("is_eager", True)
kwargs.setdefault("help", "Show the version and exit.")
kwargs["callback"] = callback
return click.option("--version", **kwargs)
@plugins.from_folder(get_plugin_dir())
@plugins.from_entry_point(iter_entry_points(PLUGIN_ENTRY_POINT))
@build_in_cmds()
@build_in_cmds
@click.group(context_settings=CONTEXT_SETTINGS)
@click.option(
"--profile",
"-P",
callback=add_param_to_session,
expose_value=False,
help="The profile to use instead primary profile (case sensitive!)."
)
@click.option(
"--password",
"-p",
callback=add_param_to_session,
expose_value=False,
help="The password for the profile auth file."
)
@version_option()
@click_verbosity_option(logger)
@profile_option
@password_option
@version_option
@verbosity_option(cli_logger=logger)
def cli():
"""Entrypoint for all other subcommands and groups."""
@click.command(context_settings=CONTEXT_SETTINGS)
@click.pass_context
@version_option()
@click_verbosity_option(logger)
@version_option
@verbosity_option(cli_logger=logger)
def quickstart(ctx):
"""Entrypoint for the quickstart command"""
try:

View file

@ -21,7 +21,7 @@ cli_cmds = [
]
def build_in_cmds():
def build_in_cmds(func=None):
"""
A decorator to register build-in CLI commands to an instance of
`click.Group()`.
@ -42,4 +42,7 @@ def build_in_cmds():
return group
if callable(func):
return decorator(func)
return decorator

View file

@ -6,7 +6,7 @@ from audible.activation_bytes import (
fetch_activation_sign_auth
)
from ..config import pass_session
from ..decorators import pass_session
logger = logging.getLogger("audible_cli.cmds.cmd_activation_bytes")

View file

@ -6,7 +6,7 @@ import sys
import click
from audible import Client
from ..config import pass_session
from ..decorators import pass_session
logger = logging.getLogger("audible_cli.cmds.cmd_api")
@ -63,7 +63,7 @@ logger = logging.getLogger("audible_cli.cmds.cmd_api")
@pass_session
def cli(session, **options):
"""Send requests to an Audible API endpoint
Take a look at
https://audible.readthedocs.io/en/latest/misc/external_api.html for known
endpoints and parameters.
@ -96,7 +96,7 @@ def cli(session, **options):
with Client(auth=auth, country_code=country_code) as client:
r = client._request(method, endpoint, params=params, json=body)
except Exception as e:
logger.error(e)
logger.error(e)
sys.exit(1)
if output_format == "json":

View file

@ -3,80 +3,38 @@ import asyncio.log
import asyncio.sslproto
import json
import pathlib
import ssl
import logging
import sys
import unicodedata
import aiofiles
import audible
import click
import httpx
import questionary
from audible.exceptions import NotFoundError
from click import echo
from tabulate import tabulate
from ..config import pass_session
from ..exceptions import DirectoryDoesNotExists, NotFoundError
from ..decorators import (
bunch_size_option,
timeout_option,
pass_client,
pass_session
)
from ..exceptions import DirectoryDoesNotExists, NotDownloadableAsAAX
from ..models import Library
from ..utils import Downloader
logger = logging.getLogger("audible_cli.cmds.cmd_download")
SSL_PROTOCOLS = (asyncio.sslproto.SSLProtocol,)
def ignore_httpx_ssl_eror(loop):
"""Ignore aiohttp #3535 / cpython #13548 issue with SSL data after close
There is an issue in Python 3.7 up to 3.7.3 that over-reports a
ssl.SSLError fatal error (ssl.SSLError: [SSL: KRB5_S_INIT] application data
after close notify (_ssl.c:2609)) after we are already done with the
connection. See GitHub issues aio-libs/aiohttp#3535 and
python/cpython#13548.
Given a loop, this sets up an exception handler that ignores this specific
exception, but passes everything else on to the previous exception handler
this one replaces.
Checks for fixed Python versions, disabling itself when running on 3.7.4+
or 3.8.
"""
if sys.version_info >= (3, 7, 4):
return
orig_handler = loop.get_exception_handler()
def ignore_ssl_error(context):
if context.get("message") in {
"SSL error in data received",
"Fatal error on transport",
}:
# validate we have the right exception, transport and protocol
exception = context.get("exception")
protocol = context.get("protocol")
if (
isinstance(exception, ssl.SSLError)
and exception.reason == "KRB5_S_INIT"
and isinstance(protocol, SSL_PROTOCOLS)
):
if loop.get_debug():
asyncio.log.logger.debug(
"Ignoring httpx SSL KRB5_S_INIT error")
return
if orig_handler is not None:
orig_handler(loop, context)
else:
loop.default_exception_handler(context)
loop.set_exception_handler(ignore_ssl_error)
CLIENT_HEADERS = {
"User-Agent": "Audible/671 CFNetwork/1240.0.4 Darwin/20.6.0"
}
class DownloadCounter:
def __init__(self):
self._aax: int = 0
self._aaxc: int = 0
self._annotation: int = 0
self._chapter: int = 0
self._cover: int = 0
self._pdf: int = 0
@ -99,6 +57,14 @@ class DownloadCounter:
self._aaxc += 1
logger.debug(f"Currently downloaded aaxc files: {self.aaxc}")
@property
def annotation(self):
return self._annotation
def count_annotation(self):
self._annotation += 1
logger.debug(f"Currently downloaded annotations: {self.annotation}")
@property
def chapter(self):
return self._chapter
@ -143,6 +109,7 @@ class DownloadCounter:
return {
"aax": self.aax,
"aaxc": self.aaxc,
"annotation": self.annotation,
"chapter": self.chapter,
"cover": self.cover,
"pdf": self.pdf,
@ -161,22 +128,6 @@ class DownloadCounter:
counter = DownloadCounter()
def create_base_filename(item, mode):
if "ascii" in mode:
base_filename = item.full_title_slugify
elif "unicode" in mode:
base_filename = unicodedata.normalize("NFKD", item.full_title)
else:
base_filename = item.asin
if "asin" in mode:
base_filename = item.asin + "_" + base_filename
return base_filename
async def download_cover(
client, output_dir, base_filename, item, res, overwrite_existing
):
@ -234,8 +185,8 @@ async def download_chapters(
try:
metadata = await item.get_content_metadata(quality)
except NotFoundError:
logger.error(
f"Can't get chapters for {item.full_title}. Skip item."
logger.info(
f"No chapters found for {item.full_title}."
)
return
metadata = json.dumps(metadata, indent=4)
@ -245,11 +196,54 @@ async def download_chapters(
counter.count_chapter()
async def download_annotations(
output_dir, base_filename, item, overwrite_existing
):
if not output_dir.is_dir():
raise DirectoryDoesNotExists(output_dir)
filename = base_filename + "-annotations.json"
file = output_dir / filename
if file.exists() and not overwrite_existing:
logger.info(
f"File {file} already exists. Skip saving annotations"
)
return True
try:
annotation = await item.get_annotations()
except NotFoundError:
logger.info(
f"No annotations found for {item.full_title}."
)
return
annotation = json.dumps(annotation, indent=4)
async with aiofiles.open(file, "w") as f:
await f.write(annotation)
logger.info(f"Annotation file saved to {file}.")
counter.count_annotation()
async def download_aax(
client, output_dir, base_filename, item, quality, overwrite_existing
client, output_dir, base_filename, item, quality, overwrite_existing,
aax_fallback
):
# url, codec = await item.get_aax_url(quality)
url, codec = await item.get_aax_url_old(quality)
try:
url, codec = await item.get_aax_url_old(quality)
except NotDownloadableAsAAX:
if aax_fallback:
logger.info(f"Fallback to aaxc for {item.full_title}")
return await download_aaxc(
client=client,
output_dir=output_dir,
base_filename=base_filename,
item=item,
quality=quality,
overwrite_existing=overwrite_existing
)
raise
filename = base_filename + f"-{codec}.aax"
filepath = output_dir / filename
dl = Downloader(
@ -346,13 +340,16 @@ async def consume(queue):
await item
except Exception as e:
logger.error(e)
queue.task_done()
raise
finally:
queue.task_done()
def queue_job(
queue,
get_cover,
get_pdf,
get_annotation,
get_chapters,
get_aax,
get_aaxc,
@ -362,9 +359,10 @@ def queue_job(
item,
cover_size,
quality,
overwrite_existing
overwrite_existing,
aax_fallback
):
base_filename = create_base_filename(item=item, mode=filename_mode)
base_filename = item.create_base_filename(filename_mode)
if get_cover:
queue.put_nowait(
@ -400,6 +398,16 @@ def queue_job(
)
)
if get_annotation:
queue.put_nowait(
download_annotations(
output_dir=output_dir,
base_filename=base_filename,
item=item,
overwrite_existing=overwrite_existing
)
)
if get_aax:
queue.put_nowait(
download_aax(
@ -408,7 +416,8 @@ def queue_job(
base_filename=base_filename,
item=item,
quality=quality,
overwrite_existing=overwrite_existing
overwrite_existing=overwrite_existing,
aax_fallback=aax_fallback
)
)
@ -425,158 +434,23 @@ def queue_job(
)
async def main(config, auth, **params):
output_dir = pathlib.Path(params.get("output_dir")).resolve()
def display_counter():
if counter.has_downloads():
echo("The download ended with the following result:")
for k, v in counter.as_dict().items():
if v == 0:
continue
# which item(s) to download
get_all = params.get("all") is True
asins = params.get("asin")
titles = params.get("title")
if get_all and (asins or titles):
logger.error(f"Do not mix *asin* or *title* option with *all* option.")
click.Abort()
# what to download
get_aax = params.get("aax")
get_aaxc = params.get("aaxc")
get_chapters = params.get("chapter")
get_cover = params.get("cover")
get_pdf = params.get("pdf")
if not any([get_aax, get_aaxc, get_chapters, get_cover, get_pdf]):
logger.error("Please select an option what you want download.")
click.Abort()
# additional options
sim_jobs = params.get("jobs")
quality = params.get("quality")
cover_size = params.get("cover_size")
overwrite_existing = params.get("overwrite")
ignore_errors = params.get("ignore_errors")
no_confirm = params.get("no_confirm")
resolve_podcats = params.get("resolve_podcasts")
ignore_podcasts = params.get("ignore_podcasts")
bunch_size = params.get("bunch_size")
timeout = params.get("timeout")
if timeout == 0:
timeout = None
filename_mode = params.get("filename_mode")
if filename_mode == "config":
filename_mode = config.profile_config.get("filename_mode") or \
config.app_config.get("filename_mode") or \
"ascii"
headers = {
"User-Agent": "Audible/671 CFNetwork/1240.0.4 Darwin/20.6.0"
}
client = httpx.AsyncClient(auth=auth, timeout=timeout, headers=headers)
api_client = audible.AsyncClient(auth, timeout=timeout)
async with client, api_client:
# fetch the user library
library = await Library.from_api_full_sync(
api_client,
image_sizes="1215, 408, 360, 882, 315, 570, 252, 558, 900, 500",
bunch_size=bunch_size
)
if resolve_podcats:
await library.resolve_podcats()
# collect jobs
jobs = []
if get_all:
asins = []
titles = []
for i in library:
jobs.append(i.asin)
for asin in asins:
if library.has_asin(asin):
jobs.append(asin)
else:
if not ignore_errors:
logger.error(f"Asin {asin} not found in library.")
click.Abort()
logger.error(
f"Skip asin {asin}: Not found in library"
)
for title in titles:
match = library.search_item_by_title(title)
full_match = [i for i in match if i[1] == 100]
if full_match or match:
echo(f"\nFound the following matches for '{title}'")
table_data = [[i[1], i[0].full_title, i[0].asin]
for i in full_match or match]
head = ["% match", "title", "asin"]
table = tabulate(
table_data, head, tablefmt="pretty",
colalign=("center", "left", "center"))
echo(table)
if no_confirm or click.confirm(
"Proceed with this audiobook(s)",
default=True
):
jobs.extend([i[0].asin for i in full_match or match])
else:
logger.error(
f"Skip title {title}: Not found in library"
)
queue = asyncio.Queue()
for job in jobs:
item = library.get_item_by_asin(job)
items = [item]
odir = pathlib.Path(output_dir)
if not ignore_podcasts and item.is_parent_podcast():
items.remove(item)
if item._children is None:
await item.get_child_items()
for i in item._children:
if i.asin not in jobs:
items.append(i)
podcast_dir = create_base_filename(item, filename_mode)
odir = output_dir / podcast_dir
if not odir.is_dir():
odir.mkdir(parents=True)
for item in items:
queue_job(
queue=queue,
get_cover=get_cover,
get_pdf=get_pdf,
get_chapters=get_chapters,
get_aax=get_aax,
get_aaxc=get_aaxc,
client=client,
output_dir=odir,
filename_mode=filename_mode,
item=item,
cover_size=cover_size,
quality=quality,
overwrite_existing=overwrite_existing
)
# schedule the consumer
consumers = [
asyncio.ensure_future(consume(queue)) for _ in range(sim_jobs)
]
# wait until the consumer has processed all items
await queue.join()
# the consumer is still awaiting an item, cancel it
for consumer in consumers:
consumer.cancel()
if k == "voucher_saved":
k = "voucher"
elif k == "voucher":
diff = v - counter.voucher_saved
if diff > 0:
echo(f"Unsaved voucher: {diff}")
continue
echo(f"New {k} files: {v}")
else:
echo("No new files downloaded.")
@click.command("download")
@ -611,6 +485,11 @@ async def main(config, auth, **params):
is_flag=True,
help="Download book in aaxc format incl. voucher file"
)
@click.option(
"--aax-fallback",
is_flag=True,
help="Download book in aax format and fallback to aaxc, if former is not supported."
)
@click.option(
"--quality", "-q",
default="best",
@ -640,6 +519,11 @@ async def main(config, auth, **params):
is_flag=True,
help="saves chapter metadata as JSON file"
)
@click.option(
"--annotation",
is_flag=True,
help="saves the annotations (e.g. bookmarks, notes) as JSON file"
)
@click.option(
"--no-confirm", "-y",
is_flag=True,
@ -670,14 +554,7 @@ async def main(config, auth, **params):
default="config",
help="Filename mode to use. [default: config]"
)
@click.option(
"--timeout",
type=click.INT,
default=10,
show_default=True,
help="Increase the timeout time if you got any TimeoutErrors. "
"Set to 0 to disable timeout."
)
@timeout_option
@click.option(
"--resolve-podcasts",
is_flag=True,
@ -688,41 +565,166 @@ async def main(config, auth, **params):
is_flag=True,
help="Ignore a podcast if it have episodes"
)
@click.option(
"--bunch-size",
type=click.IntRange(10, 1000),
default=1000,
show_default=True,
help="How many library items should be requested per request. A lower "
"size results in more requests to get the full library. A higher "
"size can result in a TimeOutError on low internet connections."
)
@bunch_size_option
@pass_session
def cli(session, **params):
@pass_client(headers=CLIENT_HEADERS)
async def cli(session, api_client, **params):
"""download audiobook(s) from library"""
loop = asyncio.get_event_loop()
ignore_httpx_ssl_eror(loop)
auth = session.auth
config = session.config
try:
loop.run_until_complete(main(config, auth, **params))
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
client = api_client.session
output_dir = pathlib.Path(params.get("output_dir")).resolve()
if counter.has_downloads():
echo("The download ended with the following result:")
for k, v in counter.as_dict().items():
if v == 0:
continue
if k == "voucher_saved":
k = "voucher"
elif k == "voucher":
diff = v - counter.voucher_saved
if diff > 0:
echo(f"Unsaved voucher: {diff}")
continue
echo(f"New {k} files: {v}")
# which item(s) to download
get_all = params.get("all") is True
asins = params.get("asin")
titles = params.get("title")
if get_all and (asins or titles):
logger.error(f"Do not mix *asin* or *title* option with *all* option.")
click.Abort()
# what to download
get_aax = params.get("aax")
get_aaxc = params.get("aaxc")
aax_fallback = params.get("aax_fallback")
if aax_fallback:
if get_aax:
logger.info("Using --aax is redundant and can be left when using --aax-fallback")
get_aax = True
if get_aaxc:
logger.warning("Do not mix --aaxc with --aax-fallback option.")
get_annotation = params.get("annotation")
get_chapters = params.get("chapter")
get_cover = params.get("cover")
get_pdf = params.get("pdf")
if not any(
[get_aax, get_aaxc, get_annotation, get_chapters, get_cover, get_pdf]
):
logger.error("Please select an option what you want download.")
click.Abort()
# additional options
sim_jobs = params.get("jobs")
quality = params.get("quality")
cover_size = params.get("cover_size")
overwrite_existing = params.get("overwrite")
ignore_errors = params.get("ignore_errors")
no_confirm = params.get("no_confirm")
resolve_podcats = params.get("resolve_podcasts")
ignore_podcasts = params.get("ignore_podcasts")
bunch_size = session.params.get("bunch_size")
filename_mode = params.get("filename_mode")
if filename_mode == "config":
filename_mode = session.config.get_profile_option(
session.selected_profile, "filename_mode") or "ascii"
# fetch the user library
library = await Library.from_api_full_sync(
api_client,
image_sizes="1215, 408, 360, 882, 315, 570, 252, 558, 900, 500",
bunch_size=bunch_size
)
if resolve_podcats:
await library.resolve_podcats()
# collect jobs
jobs = []
if get_all:
asins = []
titles = []
for i in library:
jobs.append(i.asin)
for asin in asins:
if library.has_asin(asin):
jobs.append(asin)
else:
echo("No new files downloaded.")
if not ignore_errors:
logger.error(f"Asin {asin} not found in library.")
click.Abort()
logger.error(
f"Skip asin {asin}: Not found in library"
)
for title in titles:
match = library.search_item_by_title(title)
full_match = [i for i in match if i[1] == 100]
if match:
if no_confirm:
[jobs.append(i[0].asin) for i in full_match or match]
else:
choices = []
for i in full_match or match:
a = i[0].asin
t = i[0].full_title
c = questionary.Choice(title=f"{a} # {t}", value=a)
choices.append(c)
answer = await questionary.checkbox(
f"Found the following matches for '{title}'. Which you want to download?",
choices=choices
).unsafe_ask_async()
if answer is not None:
[jobs.append(i) for i in answer]
else:
logger.error(
f"Skip title {title}: Not found in library"
)
queue = asyncio.Queue()
for job in jobs:
item = library.get_item_by_asin(job)
items = [item]
odir = pathlib.Path(output_dir)
if not ignore_podcasts and item.is_parent_podcast():
items.remove(item)
if item._children is None:
await item.get_child_items()
for i in item._children:
if i.asin not in jobs:
items.append(i)
podcast_dir = item.create_base_filename(filename_mode)
odir = output_dir / podcast_dir
if not odir.is_dir():
odir.mkdir(parents=True)
for item in items:
queue_job(
queue=queue,
get_cover=get_cover,
get_pdf=get_pdf,
get_annotation=get_annotation,
get_chapters=get_chapters,
get_aax=get_aax,
get_aaxc=get_aaxc,
client=client,
output_dir=odir,
filename_mode=filename_mode,
item=item,
cover_size=cover_size,
quality=quality,
overwrite_existing=overwrite_existing,
aax_fallback=aax_fallback
)
try:
# schedule the consumer
consumers = [
asyncio.ensure_future(consume(queue)) for _ in range(sim_jobs)
]
# wait until the consumer has processed all items
await queue.join()
finally:
# the consumer is still awaiting an item, cancel it
for consumer in consumers:
consumer.cancel()
await asyncio.gather(*consumers, return_exceptions=True)
display_counter()

View file

@ -1,15 +1,19 @@
import asyncio
import csv
import json
import pathlib
from typing import Union
import audible
import click
from click import echo
from ..config import pass_session
from ..decorators import (
bunch_size_option,
timeout_option,
pass_client,
pass_session,
wrap_async
)
from ..models import Library
from ..utils import export_to_csv
@click.group("library")
@ -17,65 +21,53 @@ def cli():
"""interact with library"""
async def _get_library(auth, **params):
timeout = params.get("timeout")
if timeout == 0:
timeout = None
async def _get_library(session, client):
bunch_size = session.params.get("bunch_size")
bunch_size = params.get("bunch_size")
async with audible.AsyncClient(auth, timeout=timeout) as client:
library = await Library.from_api_full_sync(
client,
response_groups=(
"contributors, media, price, product_attrs, product_desc, "
"product_extended_attrs, product_plan_details, product_plans, "
"rating, sample, sku, series, reviews, ws4v, origin, "
"relationships, review_attrs, categories, badge_types, "
"category_ladders, claim_code_url, is_downloaded, "
"is_finished, is_returnable, origin_asin, pdf_url, "
"percent_complete, provided_review"
),
bunch_size=bunch_size
)
return library
async def _list_library(auth, **params):
library = await _get_library(auth, **params)
books = []
for item in library:
asin = item.asin
authors = ", ".join(
sorted(a["name"] for a in item.authors) if item.authors else ""
)
series = ", ".join(
sorted(s["title"] for s in item.series) if item.series else ""
)
title = item.title
books.append((asin, authors, series, title))
for asin, authors, series, title in sorted(books):
fields = [asin]
if authors:
fields.append(authors)
if series:
fields.append(series)
fields.append(title)
echo(": ".join(fields))
def _prepare_library_for_export(library: Library):
keys_with_raw_values = (
"asin", "title", "subtitle", "runtime_length_min", "is_finished",
"percent_complete", "release_date"
return await Library.from_api_full_sync(
client,
response_groups=(
"contributors, media, price, product_attrs, product_desc, "
"product_extended_attrs, product_plan_details, product_plans, "
"rating, sample, sku, series, reviews, ws4v, origin, "
"relationships, review_attrs, categories, badge_types, "
"category_ladders, claim_code_url, is_downloaded, "
"is_finished, is_returnable, origin_asin, pdf_url, "
"percent_complete, provided_review"
),
bunch_size=bunch_size
)
prepared_library = []
for item in library:
@cli.command("export")
@click.option(
"--output", "-o",
type=click.Path(path_type=pathlib.Path),
default=pathlib.Path().cwd() / r"library.{format}",
show_default=True,
help="output file"
)
@timeout_option
@click.option(
"--format", "-f",
type=click.Choice(["tsv", "csv", "json"]),
default="tsv",
show_default=True,
help="Output format"
)
@bunch_size_option
@click.option(
"--resolve-podcasts",
is_flag=True,
help="Resolve podcasts to show all episodes"
)
@pass_session
@pass_client
async def export_library(session, client, **params):
"""export library"""
@wrap_async
def _prepare_item(item):
data_row = {}
for key in item:
v = getattr(item, key)
@ -105,128 +97,88 @@ def _prepare_library_for_export(library: Library):
genres.append(ladder["name"])
data_row["genres"] = ", ".join(genres)
prepared_library.append(data_row)
return data_row
prepared_library.sort(key=lambda x: x["asin"])
return prepared_library
def _export_to_csv(
file: pathlib.Path,
data: list,
headers: Union[list, tuple],
dialect: str
):
with file.open("w", encoding="utf-8", newline="") as f:
writer = csv.DictWriter(f, fieldnames=headers, dialect=dialect)
writer.writeheader()
for i in data:
writer.writerow(i)
async def _export_library(auth, **params):
output_format = params.get("format")
output_filename: pathlib.Path = params.get("output")
if output_filename.suffix == r".{format}":
suffix = "." + output_format
output_filename = output_filename.with_suffix(suffix)
library = await _get_library(auth, **params)
library = await _get_library(session, client)
if params.get("resolve_podcasts"):
await library.resolve_podcats()
prepared_library = _prepare_library_for_export(library)
headers = (
"asin", "title", "subtitle", "authors", "narrators", "series_title",
"series_sequence", "genres", "runtime_length_min", "is_finished",
"percent_complete", "rating", "num_ratings", "date_added",
"release_date", "cover_url"
keys_with_raw_values = (
"asin", "title", "subtitle", "runtime_length_min", "is_finished",
"percent_complete", "release_date"
)
prepared_library = await asyncio.gather(
*[_prepare_item(i) for i in library]
)
prepared_library.sort(key=lambda x: x["asin"])
if output_format in ("tsv", "csv"):
if output_format == csv:
if output_format == "csv":
dialect = "excel"
else:
dialect = "excel-tab"
_export_to_csv(output_filename, prepared_library, headers, dialect)
if output_format == "json":
headers = (
"asin", "title", "subtitle", "authors", "narrators", "series_title",
"series_sequence", "genres", "runtime_length_min", "is_finished",
"percent_complete", "rating", "num_ratings", "date_added",
"release_date", "cover_url"
)
export_to_csv(output_filename, prepared_library, headers, dialect)
elif output_format == "json":
data = json.dumps(prepared_library, indent=4)
output_filename.write_text(data)
@cli.command("export")
@click.option(
"--output", "-o",
type=click.Path(path_type=pathlib.Path),
default=pathlib.Path().cwd() / r"library.{format}",
show_default=True,
help="output file"
)
@click.option(
"--timeout", "-t",
type=click.INT,
default=10,
show_default=True,
help=(
"Increase the timeout time if you got any TimeoutErrors. "
"Set to 0 to disable timeout."
)
)
@click.option(
"--format", "-f",
type=click.Choice(["tsv", "csv", "json"]),
default="tsv",
show_default=True,
help="Output format"
)
@click.option(
"--bunch-size",
type=click.IntRange(10, 1000),
default=1000,
show_default=True,
help="How many library items should be requested per request. A lower "
"size results in more requests to get the full library. A higher "
"size can result in a TimeOutError on low internet connections."
)
@pass_session
def export_library(session, **params):
"""export library"""
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(_export_library(session.auth, **params))
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
@cli.command("list")
@timeout_option
@bunch_size_option
@click.option(
"--timeout", "-t",
type=click.INT,
default=10,
show_default=True,
help=(
"Increase the timeout time if you got any TimeoutErrors. "
"Set to 0 to disable timeout."
)
)
@click.option(
"--bunch-size",
type=click.IntRange(10, 1000),
default=1000,
show_default=True,
help="How many library items should be requested per request. A lower "
"size results in more requests to get the full library. A higher "
"size can result in a TimeOutError on low internet connections."
"--resolve-podcasts",
is_flag=True,
help="Resolve podcasts to show all episodes"
)
@pass_session
def list_library(session, **params):
@pass_client
async def list_library(session, client, resolve_podcasts=False):
"""list titles in library"""
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(_list_library(session.auth, **params))
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
@wrap_async
def _prepare_item(item):
fields = [item.asin]
authors = ", ".join(
sorted(a["name"] for a in item.authors) if item.authors else ""
)
if authors:
fields.append(authors)
series = ", ".join(
sorted(s["title"] for s in item.series) if item.series else ""
)
if series:
fields.append(series)
fields.append(item.title)
return ": ".join(fields)
library = await _get_library(session, client)
if resolve_podcasts:
await library.resolve_podcats()
books = await asyncio.gather(
*[_prepare_item(i) for i in library]
)
for i in sorted(books):
echo(i)

View file

@ -6,7 +6,7 @@ from audible import Authenticator
from click import echo, secho
from tabulate import tabulate
from ..config import pass_session
from ..decorators import pass_session
from ..utils import build_auth_file
@ -45,13 +45,13 @@ def config_editor(session):
def list_profiles(session):
"""List all profiles in the config file"""
head = ["P", "Profile", "auth file", "cc"]
profiles = session.config.data.get("profile")
config = session.config
profiles = config.data.get("profile")
data = []
for profile in profiles:
p = profiles.get(profile)
auth_file = p.get("auth_file")
country_code = p.get("country_code")
auth_file = config.get_profile_option(profile, "auth_file")
country_code = config.get_profile_option(profile, "country_code")
is_primary = profile == session.config.primary_profile
data.append(
["*" if is_primary else "", profile, auth_file, country_code])
@ -92,7 +92,7 @@ def list_profiles(session):
def add_profile(ctx, session, profile, country_code, auth_file, is_primary):
"""Adds a profile to config file"""
if not (session.config.dirname / auth_file).exists():
logger.error("Auth file doesn't exists.")
logger.error("Auth file doesn't exists")
raise click.Abort()
session.config.add_profile(
@ -167,7 +167,7 @@ def check_if_auth_file_not_exists(session, ctx, param, value):
@click.option(
"--external-login",
is_flag=True,
help="Authenticate using a webbrowser."
help="Authenticate using a web browser."
)
@click.option(
"--with-username",

View file

@ -1,13 +1,15 @@
import logging
import pathlib
import sys
import audible
import click
from click import echo, secho, prompt
from tabulate import tabulate
from ..config import Config, pass_session
from .. import __version__
from ..config import ConfigFile
from ..constants import CONFIG_FILE, DEFAULT_AUTH_FILE_EXTENSION
from ..decorators import pass_session
from ..utils import build_auth_file
@ -31,10 +33,10 @@ def tabulate_summary(d: dict) -> str:
return tabulate(data, head, tablefmt="pretty", colalign=("left", "left"))
def ask_user(config: Config):
def ask_user(config: ConfigFile):
d = {}
welcome_message = (
f"Welcome to the audible {audible.__version__} quickstart utility.")
f"\nWelcome to the audible-cli {__version__} quickstart utility.")
secho(welcome_message, bold=True)
secho(len(welcome_message) * "=", bold=True)
@ -50,11 +52,11 @@ config dir. If the auth file doesn't exists, it will be created. In this case,
an authentication to the audible server is necessary to register a new device.
"""
echo()
secho(intro, bold=True)
secho(intro)
path = config.dirname.absolute()
secho("Selected dir to proceed with:", bold=True)
echo(path.absolute())
echo(path)
echo()
echo("Please enter values for the following settings (just press Enter "
@ -137,17 +139,14 @@ an authentication to the audible server is necessary to register a new device.
@click.command("quickstart")
@click.pass_context
@pass_session
def cli(session, ctx):
"""Quicksetup audible"""
session._config = Config()
config = session.config
config._config_file = session.app_dir / CONFIG_FILE
if config.file_exists():
m = f"Config file {config.filename} already exists. Quickstart will " \
def cli(session):
"""Quick setup audible"""
config_file: pathlib.Path = session.app_dir / CONFIG_FILE
config = ConfigFile(config_file, file_exists=False)
if config_file.is_file():
m = f"Config file {config_file} already exists. Quickstart will " \
f"not overwrite existing files."
logger.error(m)
raise click.Abort()
@ -157,16 +156,9 @@ def cli(session, ctx):
echo(tabulate_summary(d))
click.confirm("Do you want to continue?", abort=True)
config.add_profile(
name=d.get("profile_name"),
auth_file=d.get("auth_file"),
country_code=d.get("country_code"),
is_primary=True,
write_config=False)
if "use_existing_auth_file" not in d:
build_auth_file(
filename=config.dirname / d.get("auth_file"),
filename=session.app_dir / d.get("auth_file"),
username=d.get("audible_username"),
password=d.get("audible_password"),
country_code=d.get("country_code"),
@ -175,4 +167,9 @@ def cli(session, ctx):
with_username=d.get("with_username")
)
config.write_config()
config.add_profile(
name=d.get("profile_name"),
auth_file=d.get("auth_file"),
country_code=d.get("country_code"),
is_primary=True,
)

View file

@ -1,70 +1,65 @@
import asyncio
import csv
import json
import logging
import pathlib
from typing import Union
import audible
import click
import httpx
import questionary
from click import echo
from ..config import pass_session
from ..models import Wishlist
from ..decorators import timeout_option, pass_client, wrap_async
from ..models import Catalog, Wishlist
from ..utils import export_to_csv
async def _get_wishlist(auth, **params):
timeout = params.get("timeout")
if timeout == 0:
timeout = None
logger = logging.getLogger("audible_cli.cmds.cmd_wishlist")
async with audible.AsyncClient(auth, timeout=timeout) as client:
wishlist = await Wishlist.from_api(
client,
response_groups=(
"contributors, media, price, product_attrs, product_desc, "
"product_extended_attrs, product_plan_details, product_plans, "
"rating, sample, sku, series, reviews, review_attrs, ws4v, "
"customer_rights, categories, category_ladders, claim_code_url"
)
# audible api raises a 500 status error when to many requests
# where made to wishlist endpoint in short time
limits = httpx.Limits(max_keepalive_connections=1, max_connections=1)
async def _get_wishlist(client):
wishlist = await Wishlist.from_api(
client,
response_groups=(
"contributors, media, price, product_attrs, product_desc, "
"product_extended_attrs, product_plan_details, product_plans, "
"rating, sample, sku, series, reviews, review_attrs, ws4v, "
"customer_rights, categories, category_ladders, claim_code_url"
)
)
return wishlist
async def _list_wishlist(auth, **params):
wishlist = await _get_wishlist(auth, **params)
books = []
for item in wishlist:
asin = item.asin
authors = ", ".join(
sorted(a["name"] for a in item.authors) if item.authors else ""
)
series = ", ".join(
sorted(s["title"] for s in item.series) if item.series else ""
)
title = item.title
books.append((asin, authors, series, title))
for asin, authors, series, title in sorted(books):
fields = [asin]
if authors:
fields.append(authors)
if series:
fields.append(series)
fields.append(title)
echo(": ".join(fields))
@click.group("wishlist")
def cli():
"""interact with wishlist"""
def _prepare_wishlist_for_export(wishlist: dict):
keys_with_raw_values = (
"asin", "title", "subtitle", "runtime_length_min", "is_finished",
"percent_complete", "release_date"
)
@cli.command("export")
@click.option(
"--output", "-o",
type=click.Path(),
default=pathlib.Path().cwd() / r"wishlist.{format}",
show_default=True,
help="output file"
)
@timeout_option
@click.option(
"--format", "-f",
type=click.Choice(["tsv", "csv", "json"]),
default="tsv",
show_default=True,
help="Output format"
)
@pass_client
async def export_wishlist(client, **params):
"""export wishlist"""
prepared_wishlist = []
for item in wishlist:
@wrap_async
def _prepare_item(item):
data_row = {}
for key in item:
v = getattr(item, key)
@ -93,116 +88,234 @@ def _prepare_wishlist_for_export(wishlist: dict):
for ladder in genre["ladder"]:
genres.append(ladder["name"])
data_row["genres"] = ", ".join(genres)
return data_row
prepared_wishlist.append(data_row)
prepared_wishlist.sort(key=lambda x: x["asin"])
return prepared_wishlist
def _export_to_csv(
file: pathlib.Path,
data: list,
headers: Union[list, tuple],
dialect: str
):
with file.open("w", encoding="utf-8", newline="") as f:
writer = csv.DictWriter(f, fieldnames=headers, dialect=dialect)
writer.writeheader()
for i in data:
writer.writerow(i)
async def _export_wishlist(auth, **params):
output_format = params.get("format")
output_filename: pathlib.Path = params.get("output")
if output_filename.suffix == r".{format}":
suffix = "." + output_format
output_filename = output_filename.with_suffix(suffix)
wishlist = await _get_wishlist(auth, **params)
wishlist = await _get_wishlist(client)
prepared_wishlist = _prepare_wishlist_for_export(wishlist)
headers = (
"asin", "title", "subtitle", "authors", "narrators", "series_title",
"series_sequence", "genres", "runtime_length_min", "is_finished",
"percent_complete", "rating", "num_ratings", "date_added",
"release_date", "cover_url"
keys_with_raw_values = (
"asin", "title", "subtitle", "runtime_length_min", "is_finished",
"percent_complete", "release_date"
)
prepared_wishlist = await asyncio.gather(
*[_prepare_item(i) for i in wishlist]
)
prepared_wishlist.sort(key=lambda x: x["asin"])
if output_format in ("tsv", "csv"):
if output_format == csv:
if output_format == "csv":
dialect = "excel"
else:
dialect = "excel-tab"
_export_to_csv(output_filename, prepared_wishlist, headers, dialect)
if output_format == "json":
headers = (
"asin", "title", "subtitle", "authors", "narrators", "series_title",
"series_sequence", "genres", "runtime_length_min", "is_finished",
"percent_complete", "rating", "num_ratings", "date_added",
"release_date", "cover_url"
)
export_to_csv(
output_filename, prepared_wishlist, headers, dialect
)
elif output_format == "json":
data = json.dumps(prepared_wishlist, indent=4)
output_filename.write_text(data)
@click.group("wishlist")
def cli():
"""interact with wishlist"""
@cli.command("export")
@click.option(
"--output", "-o",
type=click.Path(),
default=pathlib.Path().cwd() / r"wishlist.{format}",
show_default=True,
help="output file"
)
@click.option(
"--timeout", "-t",
type=click.INT,
default=10,
show_default=True,
help=(
"Increase the timeout time if you got any TimeoutErrors. "
"Set to 0 to disable timeout."
)
)
@click.option(
"--format", "-f",
type=click.Choice(["tsv", "csv", "json"]),
default="tsv",
show_default=True,
help="Output format"
)
@pass_session
def export_library(session, **params):
"""export wishlist"""
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(_export_wishlist(session.auth, **params))
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
@cli.command("list")
@click.option(
"--timeout", "-t",
type=click.INT,
default=10,
show_default=True,
help=(
"Increase the timeout time if you got any TimeoutErrors. "
"Set to 0 to disable timeout."
)
)
@pass_session
def list_library(session, **params):
@timeout_option
@pass_client
async def list_wishlist(client):
"""list titles in wishlist"""
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(_list_wishlist(session.auth, **params))
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
@wrap_async
def _prepare_item(item):
fields = [item.asin]
authors = ", ".join(
sorted(a["name"] for a in item.authors) if item.authors else ""
)
if authors:
fields.append(authors)
series = ", ".join(
sorted(s["title"] for s in item.series) if item.series else ""
)
if series:
fields.append(series)
fields.append(item.title)
return ": ".join(fields)
wishlist = await _get_wishlist(client)
books = await asyncio.gather(
*[_prepare_item(i) for i in wishlist]
)
for i in sorted(books):
echo(i)
@cli.command("add")
@click.option(
"--asin", "-a",
multiple=True,
help="asin of the audiobook"
)
@click.option(
"--title", "-t",
multiple=True,
help="tile of the audiobook (partial search)"
)
@timeout_option
@pass_client(limits=limits)
async def add_wishlist(client, asin, title):
"""add asin(s) to wishlist
Run the command without any option for interactive mode.
"""
async def add_asin(asin):
body = {"asin": asin}
r = await client.post("wishlist", body=body)
return r
asin = list(asin)
title = list(title)
if not asin and not title:
q = await questionary.select(
"Do you want to add an item by asin or title?",
choices=[
questionary.Choice(title="by title", value="title"),
questionary.Choice(title="by asin", value="asin")
]
).unsafe_ask_async()
if q == 'asin':
q = await questionary.text("Please enter the asin").unsafe_ask_async()
asin.append(q)
else:
q = await questionary.text("Please enter the title").unsafe_ask_async()
title.append(q)
for t in title:
catalog = await Catalog.from_api(
client,
title=t,
num_results=50
)
match = catalog.search_item_by_title(t)
full_match = [i for i in match if i[1] == 100]
if match:
choices = []
for i in full_match or match:
c = questionary.Choice(title=i[0].full_title, value=i[0].asin)
choices.append(c)
answer = await questionary.checkbox(
f"Found the following matches for '{t}'. Which you want to add?",
choices=choices
).unsafe_ask_async()
if answer is not None:
[asin.append(i) for i in answer]
else:
logger.error(
f"Skip title {t}: Not found in library"
)
jobs = [add_asin(a) for a in asin]
await asyncio.gather(*jobs)
wishlist = await _get_wishlist(client)
for a in asin:
if wishlist.has_asin(a):
item = wishlist.get_item_by_asin(a)
logger.info(f"{a} ({item.full_title}) added to wishlist")
else:
logger.error(f"{a} was not added to wishlist")
@cli.command("remove")
@click.option(
"--asin", "-a",
multiple=True,
help="asin of the audiobook"
)
@click.option(
"--title", "-t",
multiple=True,
help="tile of the audiobook (partial search)"
)
@timeout_option
@pass_client(limits=limits)
async def remove_wishlist(client, asin, title):
"""remove asin(s) from wishlist
Run the command without any option for interactive mode.
"""
async def remove_asin(rasin):
r = await client.delete(f"wishlist/{rasin}")
item = wishlist.get_item_by_asin(rasin)
logger.info(f"{rasin} ({item.full_title}) removed from wishlist")
return r
asin = list(asin)
wishlist = await _get_wishlist(client)
if not asin and not title:
# interactive mode
choices = []
for i in wishlist:
c = questionary.Choice(title=i.full_title, value=i.asin)
choices.append(c)
asin = await questionary.checkbox(
"Select item(s) which you want to remove from whishlist",
choices=choices
).unsafe_ask_async()
for t in title:
match = wishlist.search_item_by_title(t)
full_match = [i for i in match if i[1] == 100]
if match:
choices = []
for i in full_match or match:
c = questionary.Choice(title=i[0].full_title, value=i[0].asin)
choices.append(c)
answer = await questionary.checkbox(
f"Found the following matches for '{t}'. Which you want to remove?",
choices=choices
).unsafe_ask_async()
if answer is not None:
[asin.append(i) for i in answer]
else:
logger.error(
f"Skip title {t}: Not found in library"
)
if asin:
jobs = []
for a in asin:
if wishlist.has_asin(a):
jobs.append(remove_asin(a))
else:
logger.error(f"{a} not in wishlist")
await asyncio.gather(*jobs)

View file

@ -3,9 +3,10 @@ import os
import pathlib
from typing import Any, Dict, Optional, Union
import audible
import click
import toml
from audible import Authenticator
from audible import AsyncClient, Authenticator
from audible.exceptions import FileEncryptionError
from . import __version__
@ -22,51 +23,116 @@ from .exceptions import AudibleCliException, ProfileAlreadyExists
logger = logging.getLogger("audible_cli.config")
class Config:
"""Holds the config file data and environment."""
class ConfigFile:
"""Presents an audible-cli configuration file
def __init__(self) -> None:
self._config_file: Optional[pathlib.Path] = None
self._config_data: Dict[str, Union[str, Dict]] = DEFAULT_CONFIG_DATA
self._current_profile: Optional[str] = None
self._is_read: bool = False
Instantiate a :class:`~audible_cli.config.ConfigFile` will load the file
content by default. To create a new config file, the ``file_exists``
argument must be set to ``False``.
Audible-cli configuration files are written in the toml markup language.
It has a main section named `APP` and sections for each profile named
`profile.<profile_name>`.
Args:
filename: The file path to the config file
file_exists: If ``True``, the file must exist and the file content
is loaded.
"""
def __init__(
self,
filename: Union[str, pathlib.Path],
file_exists: bool = True
) -> None:
filename = pathlib.Path(filename).resolve()
config_data = DEFAULT_CONFIG_DATA.copy()
file_data = {}
if file_exists:
if not filename.is_file():
raise AudibleCliException(
f"Config file {click.format_filename(filename)} "
f"does not exists"
)
file_data = toml.load(filename)
logger.debug(
f"Config loaded from "
f"{click.format_filename(filename, shorten=True)}"
)
config_data.update(file_data)
self._config_file = filename
self._config_data = config_data
@property
def filename(self) -> Optional[pathlib.Path]:
def filename(self) -> pathlib.Path:
"""Returns the path to the config file"""
return self._config_file
def file_exists(self) -> bool:
return self.filename.exists()
@property
def dirname(self) -> pathlib.Path:
"""Returns the path to the config file directory"""
return self.filename.parent
def dir_exists(self) -> bool:
return self.dirname.exists()
@property
def is_read(self) -> bool:
return self._is_read
@property
def data(self) -> Dict[str, Union[str, Dict]]:
"""Returns the configuration data"""
return self._config_data
@property
def app_config(self) -> Dict[str, str]:
return self.data.get("APP", {})
@property
def profile_config(self) -> Dict[str, str]:
return self.data["profile"][self._current_profile]
@property
def primary_profile(self) -> Optional[str]:
return self.app_config.get("primary_profile")
"""Returns the configuration data for the APP section"""
return self.data["APP"]
def has_profile(self, name: str) -> bool:
return name in self.data.get("profile", {})
"""Check if a profile with this name are in the configuration data
Args:
name: The name of the profile
"""
return name in self.data["profile"]
def get_profile(self, name: str) -> Dict[str, str]:
"""Returns the configuration data for these profile name
Args:
name: The name of the profile
"""
if not self.has_profile(name):
raise AudibleCliException(f"Profile {name} does not exists")
return self.data["profile"][name]
@property
def primary_profile(self) -> str:
if "primary_profile" not in self.app_config:
raise AudibleCliException("No primary profile set in config")
return self.app_config["primary_profile"]
def get_profile_option(
self,
profile: str,
option: str,
default: Optional[str] = None
) -> str:
"""Returns the value for an option for the given profile.
Looks first, if an option is in the ``profile`` section. If not, it
searches for the option in the ``APP`` section. If not found, it
returns the ``default``.
Args:
profile: The name of the profile
option: The name of the option to search for
default: The default value to return, if the option is not found
"""
profile = self.get_profile(profile)
if option in profile:
return profile[option]
if option in self.app_config:
return self.app_config[option]
return default
def add_profile(
self,
@ -74,12 +140,22 @@ class Config:
auth_file: Union[str, pathlib.Path],
country_code: str,
is_primary: bool = False,
abort_on_existing_profile: bool = True,
write_config: bool = True,
**additional_options
) -> None:
"""Adds a new profile to the config
if self.has_profile(name) and abort_on_existing_profile:
Args:
name: The name of the profile
auth_file: The name of the auth_file
country_code: The country code of the marketplace to use with
this profile
is_primary: If ``True``, this profile is set as primary in the
``APP`` section
write_config: If ``True``, save the config to file
"""
if self.has_profile(name):
raise ProfileAlreadyExists(name)
profile_data = {
@ -92,31 +168,41 @@ class Config:
if is_primary:
self.data["APP"]["primary_profile"] = name
logger.info(f"Profile {name} added to config")
if write_config:
self.write_config()
def delete_profile(self, name: str) -> None:
def delete_profile(self, name: str, write_config: bool = True) -> None:
"""Deletes a profile from config
Args:
name: The name of the profile
write_config: If ``True``, save the config to file
Note:
Does not delete the auth file.
"""
if not self.has_profile(name):
raise AudibleCliException(f"Profile {name} does not exists")
del self.data["profile"][name]
def read_config(
self,
filename: Optional[Union[str, pathlib.Path]] = None
) -> None:
f = pathlib.Path(filename or self.filename).resolve()
logger.info(f"Profile {name} removed from config")
try:
self.data.update(toml.load(f))
except FileNotFoundError:
message = f"Config file {click.format_filename(f)} not found"
raise AudibleCliException(message)
self._config_file = f
self._is_read = True
if write_config:
self.write_config()
def write_config(
self,
filename: Optional[Union[str, pathlib.Path]] = None
) -> None:
"""Write the config data to file
Args:
filename: If not ``None`` the config is written to these file path
instead of ``self.filename``
"""
f = pathlib.Path(filename or self.filename).resolve()
if not f.parent.is_dir():
@ -124,78 +210,99 @@ class Config:
toml.dump(self.data, f.open("w"))
click_f = click.format_filename(f, shorten=True)
logger.info(f"Config written to {click_f}")
class Session:
"""Holds the settings for the current session."""
"""Holds the settings for the current session"""
def __init__(self) -> None:
self._auth: Optional[Authenticator] = None
self._config: Optional[Config] = None
self._auths: Dict[str, Authenticator] = {}
self._config: Optional[CONFIG_FILE] = None
self._params: Dict[str, Any] = {}
self._app_dir = get_app_dir()
self._plugin_dir = get_plugin_dir()
self._app_dir: pathlib.Path = get_app_dir()
self._plugin_dir: pathlib.Path = get_plugin_dir()
logger.debug(f"Audible-cli version: {__version__}")
logger.debug(f"App dir: {click.format_filename(self.app_dir)}")
logger.debug(f"Plugin dir: {click.format_filename(self.plugin_dir)}")
@property
def params(self):
"""Returns the parameter of the session
Parameter are usually added using the ``add_param_to_session``
callback on a click option. This way an option from a parent command
can be accessed from his subcommands.
"""
return self._params
@property
def app_dir(self):
"""Returns the path of the app dir"""
return self._app_dir
@property
def plugin_dir(self):
"""Returns the path of the plugin dir"""
return self._plugin_dir
@property
def config(self):
"""Returns the ConfigFile for this session"""
if self._config is None:
conf_file = self.app_dir / CONFIG_FILE
self._config = Config()
logger.debug(
f"Load config from file: "
f"{click.format_filename(conf_file, shorten=True)}"
)
self._config.read_config(conf_file)
name = self.params.get("profile") or self.config.primary_profile
logger.debug(f"Selected profile: {name}")
if name is None:
message = (
"No profile provided and primary profile not set "
"properly in config."
)
try:
ctx = click.get_current_context()
ctx.fail(message)
except RuntimeError:
raise KeyError(message)
if not self.config.has_profile(name):
message = "Provided profile not found in config."
try:
ctx = click.get_current_context()
ctx.fail(message)
except RuntimeError:
raise UserWarning(message)
self.config._current_profile = name
self._config = ConfigFile(conf_file)
return self._config
def _set_auth(self):
profile = self.config.profile_config
auth_file = self.config.dirname / profile["auth_file"]
country_code = profile["country_code"]
password = self.params.get("password")
@property
def selected_profile(self):
"""Returns the selected config profile name for this session
The `profile` to use must be set using the ``add_param_to_session``
callback of a click option. Otherwise, the primary profile from the
config is used.
"""
profile = self.params.get("profile") or self.config.primary_profile
if profile is None:
message = (
"No profile provided and primary profile not set "
"properly in config."
)
raise AudibleCliException(message)
return profile
def get_auth_for_profile(
self,
profile: str,
password: Optional[str] = None
) -> audible.Authenticator:
"""Returns an Authenticator for a profile
If an Authenticator for this profile is already loaded, it will
return the Authenticator without reloading it. This way a session can
hold multiple Authenticators for different profiles. Commands can use
this to make API requests for more than one profile.
Args:
profile: The name of the profile
password: The password of the auth file
"""
if profile in self._auths:
return self._auths[profile]
if not self.config.has_profile(profile):
message = "Provided profile not found in config."
raise AudibleCliException(message)
auth_file = self.config.get_profile_option(profile, "auth_file")
country_code = self.config.get_profile_option(profile, "country_code")
while True:
try:
self._auth = Authenticator.from_file(
filename=auth_file,
auth = Authenticator.from_file(
filename=self.config.dirname / auth_file,
password=password,
locale=country_code)
break
@ -204,20 +311,39 @@ class Session:
"Auth file is encrypted but no/wrong password is provided"
)
password = click.prompt(
"Please enter the password (or enter to exit)",
"Please enter the auth-file password (or enter to exit)",
hide_input=True,
default="")
if len(password) == 0:
raise click.Abort()
click_f = click.format_filename(auth_file, shorten=True)
logger.debug(f"Auth file {click_f} for profile {profile} loaded.")
self._auths[profile] = auth
return auth
@property
def auth(self):
if self._auth is None:
self._set_auth()
return self._auth
"""Returns the Authenticator for the selected profile"""
profile = self.selected_profile
password = self.params.get("password")
return self.get_auth_for_profile(profile, password)
def get_client_for_profile(
self,
profile: str,
password: Optional[str] = None,
**kwargs
) -> AsyncClient:
auth = self.get_auth_for_profile(profile, password)
kwargs.setdefault("timeout", self.params.get("timeout", 5))
return AsyncClient(auth=auth, **kwargs)
pass_session = click.make_pass_decorator(Session, ensure=True)
def get_client(self, **kwargs) -> AsyncClient:
profile = self.selected_profile
password = self.params.get("password")
return self.get_client_for_profile(profile, password, **kwargs)
def get_app_dir() -> pathlib.Path:
@ -230,10 +356,3 @@ def get_app_dir() -> pathlib.Path:
def get_plugin_dir() -> pathlib.Path:
plugin_dir = os.getenv(PLUGIN_DIR_ENV) or (get_app_dir() / PLUGIN_PATH)
return pathlib.Path(plugin_dir).resolve()
def add_param_to_session(ctx: click.Context, param, value):
"""Add a parameter to :class:`Session` `param` attribute"""
session = ctx.ensure_object(Session)
session.params[param.name] = value
return value

View file

@ -1,3 +1,6 @@
from typing import Dict
APP_NAME: str = "Audible"
CONFIG_FILE: str = "config.toml"
CONFIG_DIR_ENV: str = "AUDIBLE_CONFIG_DIR"
@ -6,10 +9,10 @@ PLUGIN_DIR_ENV: str = "AUDIBLE_PLUGIN_DIR"
PLUGIN_ENTRY_POINT: str = "audible.cli_plugins"
DEFAULT_AUTH_FILE_EXTENSION: str = "json"
DEFAULT_AUTH_FILE_ENCRYPTION: str = "json"
DEFAULT_CONFIG_DATA = {
DEFAULT_CONFIG_DATA: Dict[str, str] = {
"title": "Audible Config File",
"APP": {},
"profile": {}
}
CODEC_HIGH_QUALITY = "AAX_44_128"
CODEC_NORMAL_QUALITY = "AAX_44_64"
CODEC_HIGH_QUALITY: str = "AAX_44_128"
CODEC_NORMAL_QUALITY: str = "AAX_44_64"

View file

@ -0,0 +1,238 @@
import asyncio
import logging
from functools import partial, wraps
import click
import httpx
from packaging.version import parse
from .config import Session
from ._logging import _normalize_logger
from . import __version__
logger = logging.getLogger("audible_cli.options")
pass_session = click.make_pass_decorator(Session, ensure=True)
def run_async(f):
@wraps(f)
def wrapper(*args, **kwargs):
if hasattr(asyncio, "run"):
logger.debug("Using asyncio.run ...")
return asyncio.run(f(*args, ** kwargs))
else:
logger.debug("Using asyncio.run_until_complete ...")
loop = asyncio.get_event_loop()
if loop.is_closed():
loop = asyncio.new_event_loop()
try:
return loop.run_until_complete(f(*args, ** kwargs))
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
return wrapper
def wrap_async(f):
"""Wrap a synchronous function and runs them in an executor"""
@wraps(f)
async def wrapper(*args, loop=None, executor=None, **kwargs):
if loop is None:
loop = asyncio.get_event_loop()
partial_func = partial(f, *args, **kwargs)
return await loop.run_in_executor(executor, partial_func)
return wrapper
def pass_client(func=None, **client_kwargs):
def coro(f):
@wraps(f)
@pass_session
@run_async
async def wrapper(session, *args, **kwargs):
client = session.get_client(**client_kwargs)
async with client.session:
return await f(*args, client, **kwargs)
return wrapper
if callable(func):
return coro(func)
return coro
def add_param_to_session(ctx: click.Context, param, value):
"""Add a parameter to :class:`Session` `param` attribute
This is usually used as a callback for a click option
"""
session = ctx.ensure_object(Session)
session.params[param.name] = value
return value
def version_option(func=None, **kwargs):
def callback(ctx, param, value):
if not value or ctx.resilient_parsing:
return
message = f"audible-cli, version {__version__}"
click.echo(message, color=ctx.color, nl=False)
url = "https://api.github.com/repos/mkb79/audible-cli/releases/latest"
headers = {"Accept": "application/vnd.github.v3+json"}
logger.debug(f"Requesting Github API for latest release information")
try:
response = httpx.get(url, headers=headers, follow_redirects=True)
response.raise_for_status()
except Exception as e:
logger.error(e)
click.Abort()
content = response.json()
current_version = parse(__version__)
latest_version = parse(content["tag_name"])
html_url = content["html_url"]
if latest_version > current_version:
click.echo(
f" (update available)\nVisit {html_url} "
f"for information about the new release.",
color=ctx.color
)
else:
click.echo(" (up-to-date)", color=ctx.color)
ctx.exit()
kwargs.setdefault("is_flag", True)
kwargs.setdefault("expose_value", False)
kwargs.setdefault("is_eager", True)
kwargs.setdefault("help", "Show the version and exit.")
kwargs["callback"] = callback
option = click.option("--version", **kwargs)
if callable(func):
return option(func)
return option
def profile_option(func=None, **kwargs):
kwargs.setdefault("callback", add_param_to_session)
kwargs.setdefault("expose_value", False)
kwargs.setdefault(
"help",
"The profile to use instead primary profile (case sensitive!)."
)
option = click.option("--profile", "-P", **kwargs)
if callable(func):
return option(func)
return option
def password_option(func=None, **kwargs):
kwargs.setdefault("callback", add_param_to_session)
kwargs.setdefault("expose_value", False)
kwargs.setdefault("help", "The password for the profile auth file.")
option = click.option("--password", "-p", **kwargs)
if callable(func):
return option(func)
return option
def verbosity_option(func=None, *, cli_logger=None, **kwargs):
"""A decorator that adds a `--verbosity, -v` option to the decorated
command.
Keyword arguments are passed to
the underlying ``click.option`` decorator.
"""
def callback(ctx, param, value):
x = getattr(logging, value.upper(), None)
if x is None:
raise click.BadParameter(
f"Must be CRITICAL, ERROR, WARNING, INFO or DEBUG, "
f"not {value}"
)
cli_logger.setLevel(x)
kwargs.setdefault("default", "INFO")
kwargs.setdefault("metavar", "LVL")
kwargs.setdefault("expose_value", False)
kwargs.setdefault(
"help", "Either CRITICAL, ERROR, WARNING, "
"INFO or DEBUG. [default: INFO]"
)
kwargs.setdefault("is_eager", True)
kwargs.setdefault("callback", callback)
cli_logger = _normalize_logger(cli_logger)
option = click.option("--verbosity", "-v", **kwargs)
if callable(func):
return option(func)
return option
def timeout_option(func=None, **kwargs):
def callback(ctx: click.Context, param, value):
if value == 0:
value = None
session = ctx.ensure_object(Session)
session.params[param.name] = value
return value
kwargs.setdefault("type", click.INT)
kwargs.setdefault("default", 10)
kwargs.setdefault("show_default", True)
kwargs.setdefault(
"help", ("Increase the timeout time if you got any TimeoutErrors. "
"Set to 0 to disable timeout.")
)
kwargs.setdefault("callback", callback)
kwargs.setdefault("expose_value", False)
option = click.option("--timeout", **kwargs)
if callable(func):
return option(func)
return option
def bunch_size_option(func=None, **kwargs):
kwargs.setdefault("type", click.IntRange(10, 1000))
kwargs.setdefault("default", 1000)
kwargs.setdefault("show_default", True)
kwargs.setdefault(
"help", ("How many library items should be requested per request. A "
"lower size results in more requests to get the full library. "
"A higher size can result in a TimeOutError on low internet "
"connections.")
)
kwargs.setdefault("callback", add_param_to_session)
kwargs.setdefault("expose_value", False)
option = click.option("--bunch-size", **kwargs)
if callable(func):
return option(func)
return option

View file

@ -9,6 +9,10 @@ class NotFoundError(AudibleCliException):
"""Raised if an item is not found"""
class NotDownloadableAsAAX(AudibleCliException):
"""Raised if an item is not downloadable in aax format"""
class FileDoesNotExists(AudibleCliException):
"""Raised if a file does not exist"""

View file

@ -2,16 +2,17 @@ import asyncio
import logging
import string
import unicodedata
from math import ceil
from typing import List, Optional, Union
import audible
import httpx
from audible.aescipher import decrypt_voucher_from_licenserequest
from audible.client import convert_response_content
from .constants import CODEC_HIGH_QUALITY, CODEC_NORMAL_QUALITY
from .exceptions import AudibleCliException
from .utils import LongestSubString
from .exceptions import AudibleCliException, NotDownloadableAsAAX
from .utils import full_response_callback, LongestSubString
logger = logging.getLogger("audible_cli.models")
@ -72,6 +73,27 @@ class BaseItem:
return slug_title
def create_base_filename(self, mode: str):
supported_modes = ("ascii", "asin_ascii", "unicode", "asin_unicode")
if mode not in supported_modes:
raise AudibleCliException(
f"Unsupported mode {mode} for name creation"
)
if "ascii" in mode:
base_filename = self.full_title_slugify
elif "unicode" in mode:
base_filename = unicodedata.normalize("NFKD", self.full_title)
else:
base_filename = self.asin
if "asin" in mode:
base_filename = self.asin + "_" + base_filename
return base_filename
def substring_in_title_accuracy(self, substring):
match = LongestSubString(substring, self.full_title)
return round(match.percentage, 2)
@ -153,7 +175,7 @@ class LibraryItem(BaseItem):
"""
# Only items with content_delivery_type
# MultiPartBook or Periodical have child elemts
# MultiPartBook or Periodical have child elements
if not self.has_children:
return
@ -189,23 +211,22 @@ class LibraryItem(BaseItem):
def is_downloadable(self):
# customer_rights must be in response_groups
if self.customer_rights is not None:
if not self.customer_rights["is_consumable_offline"]:
return False
else:
if self.customer_rights["is_consumable_offline"]:
return True
return False
async def get_aax_url_old(self, quality: str = "high"):
if not self.is_downloadable():
raise AudibleCliException(
f"{self.full_title} is not downloadable. Skip item."
f"{self.full_title} is not downloadable."
)
codec, codec_name = self._get_codec(quality)
if codec is None:
raise AudibleCliException(
if codec is None or self.is_ayce:
raise NotDownloadableAsAAX(
f"{self.full_title} is not downloadable in AAX format"
)
url = (
"https://cde-ta-g7g.amazon.com/FionaCDEServiceEngine/"
"FSDownloadContent"
@ -238,8 +259,8 @@ class LibraryItem(BaseItem):
)
codec, codec_name = self._get_codec(quality)
if codec is None:
raise AudibleCliException(
if codec is None or self.is_ayce:
raise NotDownloadableAsAAX(
f"{self.full_title} is not downloadable in AAX format"
)
@ -252,6 +273,11 @@ class LibraryItem(BaseItem):
return httpx.URL(url, params=params), codec_name
async def get_aaxc_url(self, quality: str = "high"):
if not self.is_downloadable():
raise AudibleCliException(
f"{self.full_title} is not downloadable."
)
assert quality in ("best", "high", "normal",)
body = {
@ -292,6 +318,17 @@ class LibraryItem(BaseItem):
return metadata
async def get_annotations(self):
url = f"https://cde-ta-g7g.amazon.com/FionaCDEServiceEngine/sidecar"
params = {
"type": "AUDI",
"key": self.asin
}
annotations = await self._client.get(url, params=params)
return annotations
class WishlistItem(BaseItem):
pass
@ -315,9 +352,13 @@ class BaseList:
def _prepare_data(self, data: Union[dict, list]) -> Union[dict, list]:
return data
@property
def data(self):
return self._data
def get_item_by_asin(self, asin):
try:
return next(i for i in self._data if asin in i.asin)
return next(i for i in self._data if asin == i.asin)
except StopIteration:
return None
@ -354,6 +395,7 @@ class Library(BaseList):
async def from_api(
cls,
api_client: audible.AsyncClient,
include_total_count_header: bool = False,
**request_params
):
if "response_groups" not in request_params:
@ -369,8 +411,18 @@ class Library(BaseList):
"periodicals, provided_review, product_details"
)
resp = await api_client.get("library", **request_params)
return cls(resp, api_client=api_client)
resp: httpx.Response = await api_client.get(
"library",
response_callback=full_response_callback,
**request_params
)
resp_content = convert_response_content(resp)
total_count_header = resp.headers.get("total-count")
cls_instance = cls(resp_content, api_client=api_client)
if include_total_count_header:
return cls_instance, total_count_header
return cls_instance
@classmethod
async def from_api_full_sync(
@ -379,33 +431,42 @@ class Library(BaseList):
bunch_size: int = 1000,
**request_params
) -> "Library":
request_params["page"] = 1
request_params.pop("page", None)
request_params["num_results"] = bunch_size
library = []
while True:
resp = await cls.from_api(api_client, params=request_params)
items = resp._data
len_items = len(items)
library.extend(items)
if len_items < bunch_size:
break
request_params["page"] += 1
library, total_count = await cls.from_api(
api_client,
page=1,
params=request_params,
include_total_count_header=True,
)
pages = ceil(int(total_count) / bunch_size)
if pages == 1:
return library
resp._data = library
return resp
additional_pages = []
for page in range(2, pages+1):
additional_pages.append(
cls.from_api(
api_client,
page=page,
params=request_params,
)
)
additional_pages = await asyncio.gather(*additional_pages)
for p in additional_pages:
library.data.extend(p.data)
return library
async def resolve_podcats(self):
podcasts = []
for i in self:
if i.is_parent_podcast():
podcasts.append(i)
podcast_items = await asyncio.gather(
*[i.get_child_items() for i in podcasts]
*[i.get_child_items() for i in self if i.is_parent_podcast()]
)
for i in podcast_items:
self._data.extend(i._data)
self.data.extend(i.data)
class Catalog(BaseList):
@ -465,16 +526,11 @@ class Catalog(BaseList):
return cls(resp, api_client=api_client)
async def resolve_podcats(self):
podcasts = []
for i in self:
if i.is_parent_podcast():
podcasts.append(i)
podcast_items = await asyncio.gather(
*[i.get_child_items() for i in podcasts]
*[i.get_child_items() for i in self if i.is_parent_podcast()]
)
for i in podcast_items:
self._data.extend(i._data)
self.data.extend(i.data)
class Wishlist(BaseList):

View file

@ -1,9 +1,8 @@
import asyncio
import csv
import io
import logging
import pathlib
from difflib import SequenceMatcher
from functools import partial, wraps
from typing import List, Optional, Union
import aiofiles
@ -12,6 +11,7 @@ import httpx
import tqdm
from PIL import Image
from audible import Authenticator
from audible.client import raise_for_status
from audible.login import default_login_url_callback
from click import echo, secho, prompt
@ -32,7 +32,7 @@ def prompt_captcha_callback(captcha_url: str) -> str:
img.show()
else:
echo(
"Please open the following url with a webbrowser "
"Please open the following url with a web browser "
"to get the captcha:"
)
echo(captcha_url)
@ -60,6 +60,11 @@ def prompt_external_callback(url: str) -> str:
return default_login_url_callback(url)
def full_response_callback(resp: httpx.Response) -> httpx.Response:
raise_for_status(resp)
return resp
def build_auth_file(
filename: Union[str, pathlib.Path],
username: Optional[str],
@ -142,17 +147,6 @@ def asin_in_library(asin, library):
return False
def wrap_async(func):
@wraps(func)
async def run(*args, loop=None, executor=None, **kwargs):
if loop is None:
loop = asyncio.get_event_loop()
pfunc = partial(func, *args, **kwargs)
return await loop.run_in_executor(executor, pfunc)
return run
class DummyProgressBar:
def __enter__(self):
return self
@ -256,8 +250,7 @@ class Downloader:
file.rename(file.with_suffix(f"{file.suffix}.old.{i}"))
tmp_file.rename(file)
logger.info(
f"File {self._file} downloaded to {self._file.parent} "
f"in {elapsed}."
f"File {self._file} downloaded in {elapsed}."
)
return True
@ -300,3 +293,17 @@ class Downloader:
await self._load()
finally:
self._remove_tmp_file()
def export_to_csv(
file: pathlib.Path,
data: list,
headers: Union[list, tuple],
dialect: str
) -> None:
with file.open("w", encoding="utf-8", newline="") as f:
writer = csv.DictWriter(f, fieldnames=headers, dialect=dialect)
writer.writeheader()
for i in data:
writer.writerow(i)

View file

@ -5,5 +5,5 @@ Tab completion can be provided for commands, options and choice values.
Bash, Zsh and Fish are supported.
Simply copy the activation script for your shell from this folder to your machine.
Read [here](https://click.palletsprojects.com/en/7.x/bashcomplete/#activation-script)
Read [here](https://click.palletsprojects.com/en/8.0.x/shell-completion/)
how-to activate the script in your shell.

View file

@ -1,2 +1,2 @@
_AUDIBLE_COMPLETE=source_bash audible
_AUDIBLE_QUICKSTART_COMPLETE=source_bash audible-quickstart
_AUDIBLE_COMPLETE=bash_source audible
_AUDIBLE_QUICKSTART_COMPLETE=bash_source audible-quickstart

View file

@ -1,2 +1,2 @@
_AUDIBLE_COMPLETE=source_zsh audible
_AUDIBLE_QUICKSTART_COMPLETE=source_zsh audible-quickstart
_AUDIBLE_COMPLETE=zsh_source audible
_AUDIBLE_QUICKSTART_COMPLETE=zsh_source audible-quickstart

View file

@ -1,6 +1,6 @@
"""
This script replaces the chapter titles from a ffmetadata file with the one
extracted from a api metadata/voucher file
extracted from an API metadata/voucher file
Example: