Compare commits

..

No commits in common. "master" and "v0.1.3" have entirely different histories.

37 changed files with 1396 additions and 3668 deletions

13
.github/FUNDING.yml vendored
View file

@ -1,13 +0,0 @@
# These are supported funding model platforms
github: [mkb79] # Replace with up to 4 GitHub Sponsors-enabled usernames e.g., [user1, user2]
patreon: # Replace with a single Patreon username
open_collective: # Replace with a single Open Collective username
ko_fi: # Replace with a single Ko-fi username
tidelift: # Replace with a single Tidelift platform-name/package-name e.g., npm/babel
community_bridge: # Replace with a single Community Bridge project-name e.g., cloud-foundry
liberapay: # Replace with a single Liberapay username
issuehunt: # Replace with a single IssueHunt username
otechie: # Replace with a single Otechie username
lfx_crowdfunding: # Replace with a single LFX Crowdfunding project-name e.g., cloud-foundry
custom: # Replace with up to 4 custom sponsorship URLs e.g., ['link1', 'link2']

View file

@ -9,12 +9,10 @@ jobs:
createrelease:
name: Create Release
runs-on: ubuntu-latest
outputs:
release_url: ${{ steps.create-release.outputs.upload_url }}
runs-on: [ubuntu-latest]
steps:
- name: Create Release
id: create-release
id: create_release
uses: actions/create-release@v1
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
@ -23,6 +21,13 @@ jobs:
release_name: Release ${{ github.ref }}
draft: false
prerelease: false
- name: Output Release URL File
run: echo "${{ steps.create_release.outputs.upload_url }}" > release_url.txt
- name: Save Release URL File for publish
uses: actions/upload-artifact@v2
with:
name: release_url
path: release_url.txt
build:
name: Build packages
@ -39,13 +44,13 @@ jobs:
zip -r9 audible_linux_ubuntu_latest audible
OUT_FILE_NAME: audible_linux_ubuntu_latest.zip
ASSET_MIME: application/zip # application/octet-stream
- os: ubuntu-20.04
- os: ubuntu-18.04
TARGET: linux
CMD_BUILD: >
pyinstaller --clean -F --hidden-import audible_cli -n audible -c pyi_entrypoint.py &&
cd dist/ &&
zip -r9 audible_linux_ubuntu_20_04 audible
OUT_FILE_NAME: audible_linux_ubuntu_20_04.zip
zip -r9 audible_linux_ubuntu_18_04 audible
OUT_FILE_NAME: audible_linux_ubuntu_18_04.zip
ASSET_MIME: application/zip # application/octet-stream
- os: macos-latest
TARGET: macos
@ -80,23 +85,34 @@ jobs:
OUT_FILE_NAME: audible_win.zip
ASSET_MIME: application/zip
steps:
- uses: actions/checkout@v4
- name: Set up Python 3.11
uses: actions/setup-python@v4
- uses: actions/checkout@v2
- name: Set up Python 3.8
uses: actions/setup-python@v2
with:
python-version: 3.11
python-version: '3.8'
- name: Install dependencies
run: |
python -m pip install --upgrade pip .[pyi] && pip list
- name: Build with pyinstaller for ${{matrix.TARGET}}
run: ${{matrix.CMD_BUILD}}
- name: Load Release URL File from release job
uses: actions/download-artifact@v2
with:
name: release_url
path: release_url
- name: Get Release File Name & Upload URL
id: get_release_info
shell: bash
run: |
value=`cat release_url/release_url.txt`
echo ::set-output name=upload_url::$value
- name: Upload Release Asset
id: upload-release-asset
uses: actions/upload-release-asset@v1
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
with:
upload_url: ${{ needs.createrelease.outputs.release_url }}
upload_url: ${{ steps.get_release_info.outputs.upload_url }}
asset_path: ./dist/${{ matrix.OUT_FILE_NAME}}
asset_name: ${{ matrix.OUT_FILE_NAME}}
asset_content_type: ${{ matrix.ASSET_MIME}}

View file

@ -6,20 +6,20 @@ on:
jobs:
build-n-publish:
name: Build and publish Audible-cli to TestPyPI
runs-on: ubuntu-latest
runs-on: ubuntu-18.04
steps:
- uses: actions/checkout@v4
- name: Set up Python 3.11
uses: actions/setup-python@v4
- uses: actions/checkout@master
- name: Set up Python 3.9
uses: actions/setup-python@v1
with:
python-version: 3.11
python-version: 3.9
- name: Install setuptools and wheel
run: pip install --upgrade pip setuptools wheel
- name: Build a binary wheel and a source tarball
run: python setup.py sdist bdist_wheel
- name: Publish distribution to Test PyPI
uses: pypa/gh-action-pypi-publish@release/v1
uses: pypa/gh-action-pypi-publish@master
with:
password: ${{ secrets.TEST_PYPI_API_TOKEN }}
repository_url: https://test.pypi.org/legacy/

View file

@ -6,19 +6,19 @@ on:
jobs:
build-n-publish:
name: Build and publish Audible-cli to PyPI
runs-on: ubuntu-latest
runs-on: ubuntu-18.04
steps:
- uses: actions/checkout@v4
- name: Set up Python 3.11
uses: actions/setup-python@v4
- uses: actions/checkout@master
- name: Set up Python 3.9
uses: actions/setup-python@v1
with:
python-version: 3.11
python-version: 3.9
- name: Install setuptools and wheel
run: pip install --upgrade pip setuptools wheel
- name: Build a binary wheel and a source tarball
run: python setup.py sdist bdist_wheel
- name: Publish distribution to PyPI
uses: pypa/gh-action-pypi-publish@release/v1
uses: pypa/gh-action-pypi-publish@master
with:
password: ${{ secrets.PYPI_API_TOKEN }}

View file

@ -6,173 +6,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
## Unreleased
### Bugfix
- Fixing `[Errno 18] Invalid cross-device link` when downloading files using the `--output-dir` option. This error is fixed by creating the resume file on the same location as the target file.
### Added
- The `--chapter-type` option is added to the download command. Chapter can now be
downloaded as `flat` or `tree` type. `tree` is the default. A default chapter type
can be set in the config file.
### Changed
- Improved podcast ignore feature in download command
- make `--ignore-podcasts` and `--resolve-podcasts` options of download command mutual
exclusive
- Switched from a HEAD to a GET request without loading the body in the downloader
class. This change improves the program's speed, as the HEAD request was taking
considerably longer than a GET request on some Audible pages.
- `models.LibraryItem.get_content_metadatata` now accept a `chapter_type` argument.
Additional keyword arguments to this method are now passed through the metadata
request.
- Update httpx version range to >=0.23.3 and <0.28.0.
- fix typo from `resolve_podcats` to `resolve_podcasts`
- `models.Library.resolve_podcats` is now deprecated and will be removed in a future version
## [0.3.1] - 2024-03-19
### Bugfix
- fix a `TypeError` on some Python versions when calling `importlib.metadata.entry_points` with group argument
## [0.3.0] - 2024-03-19
### Added
- Added a resume feature when downloading aaxc files.
- New `downlaoder` module which contains a rework of the Downloader class.
- If necessary, large audiobooks are now downloaded in parts.
- Plugin command help page now contains additional information about the source of
the plugin.
- Command help text now starts with ´(P)` for plugin commands.
### Changed
- Rework plugin module
- using importlib.metadata over setuptools (pkg_resources) to get entrypoints
## [0.2.6] - 2023-11-16
### Added
- Update marketplace choices in `manage auth-file add` command. Now all available marketplaces are listed.
### Bugfix
- Avoid tqdm progress bar interruption by loggers output to console.
- Fixing an issue with unawaited coroutines when the download command exited abnormal.
### Changed
- Update httpx version range to >=0.23.3 and <0.26.0.
### Misc
- add `freeze_support` to pyinstaller entry script (#78)
## [0.2.5] - 2023-09-26
### Added
- Dynamically load available marketplaces from the `audible package`. Allows to implement a new marketplace without updating `audible-cli`.
## [0.2.4] - 2022-09-21
### Added
- Allow download multiple cover sizes at once. Each cover size must be provided with the `--cover-size` option
### Changed
- Rework start_date and end_date option
### Bugfix
- In some cases, the purchase date is None. This results in an exception. Now check for purchase date or date added and skip, if date is missing
## [0.2.3] - 2022-09-06
### Added
- `--start-date` and `--end-date` option to `download` command
- `--start-date` and `--end-date` option to `library export` and `library list` command
- better error handling for license requests
- verify that a download link is valid
- make sure an item is published before downloading the aax, aaxc or pdf file
- `--ignore-errors` flag of the download command now continue, if an item failed to download
## [0.2.2] - 2022-08-09
### Bugfix
- PDFs could not be found using the download command (#112)
## [0.2.1] - 2022-07-29
### Added
- `library` command now outputs the `extended_product_description` field
### Changed
- by default a licenserequest (voucher) will not include chapter information by default
- moved licenserequest part from `models.LibraryItem.get_aaxc_url` to its own `models.LibraryItem.get_license` function
- allow book titles with hyphens (#96)
- if there is no title fallback to an empty string (#98)
- reduce `response_groups` for the download command to speed up fetching the library (#109)
### Fixed
- `Extreme` quality is not supported by the Audible API anymore (#107)
- download command continued execution after error (#104)
- Currently, paths with dots will break the decryption (#97)
- `models.Library.from_api_full_sync` called `models.Library.from_api` with incorrect keyword arguments
### Misc
- reworked `cmd_remove-encryption` plugin command (e.g. support nested chapters, use chapter file for aaxc files)
- added explanation in README.md for creating a second profile
## [0.2.0] - 2022-06-01
### Added
- `--aax-fallback` option to `download` command to download books in aax format and fallback to aaxc, if the book is not available as aax
- `--annotation` option to `download` command to get bookmarks and notes
- `questionary` package to dependencies
- `add` and `remove` subcommands to wishlist
- `full_response_callback` to `utils`
- `export_to_csv` to `utils`
- `run_async` to `decorators`
- `pass_client` to `decorators`
- `profile_option` to `decorators`
- `password_option` to `decorators`
- `timeout_option` to `decorators`
- `bunch_size_option` to `decorators`
- `ConfigFile.get_profile_option` get the value for an option for a given profile
- `Session.selected.profile` to get the profile name for the current session
- `Session.get_auth_for_profile` to get an auth file for a given profile
- `models.BaseItem.create_base_filename` to build a filename in given mode
- `models.LibraryItem.get_annotations` to get annotations for a library item
### Changed
- bump `audible` to v0.8.2 to fix a bug in httpx
- rework plugin examples in `plugin_cmds`
- rename `config.Config` to `config.ConfigFile`
- move `click_verbosity_logger` from `_logging` to `decorators` and rename it to `verbosity_option`
- move `wrap_async` from `utils` to `decorators`
- move `add_param_to_session` from `config` to `decorators`
- move `pass_session` from `config` to `decorators`
- `download` command let you now select items when using `--title` option
### Fixed
- the `library export` and `wishlist export` command will now export to `csv` correctly
-
-
## [0.1.3] - 2022-03-27
@ -191,7 +25,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
### Added
- the `--version` option now checks if an update for `audible-cli` is available
- build macOS releases in `onedir` mode
- build macOS release in onedir mode
### Bugfix

View file

@ -13,7 +13,7 @@ It depends on the following packages:
* aiofiles
* audible
* click
* colorama (on Windows machines)
* colorama (on windows machines)
* httpx
* Pillow
* tabulate
@ -30,7 +30,7 @@ pip install audible-cli
```
or install it directly from GitHub with
or install it directly from github with
```shell
@ -40,25 +40,18 @@ pip install .
```
or as the best solution using [pipx](https://pipx.pypa.io/stable/)
```shell
pipx install audible-cli
```
## Standalone executables
If you don't want to install `Python` and `audible-cli` on your machine, you can
find standalone exe files below or on the [releases](https://github.com/mkb79/audible-cli/releases)
page (including beta releases). At this moment Windows, Linux and macOS are supported.
page. At this moment Windows, Linux and MacOS are supported.
### Links
1. Linux
- [debian 11 onefile](https://github.com/mkb79/audible-cli/releases/latest/download/audible_linux_debian_11.zip)
- [ubuntu latest onefile](https://github.com/mkb79/audible-cli/releases/latest/download/audible_linux_ubuntu_latest.zip)
- [ubuntu 20.04 onefile](https://github.com/mkb79/audible-cli/releases/latest/download/audible_linux_ubuntu_20_04.zip)
- [ubuntu 18.04 onefile](https://github.com/mkb79/audible-cli/releases/latest/download/audible_linux_ubuntu_18_04.zip)
2. macOS
- [macOS latest onefile](https://github.com/mkb79/audible-cli/releases/latest/download/audible_mac.zip)
@ -89,7 +82,7 @@ pyinstaller --clean -D --hidden-import audible_cli -n audible -c pyi_entrypoint
### Hints
There are some limitations when using plugins. The binary maybe does not contain
There are some limitations when using plugins. The binarys maybe does not contain
all the dependencies from your plugin script.
## Tab Completion
@ -110,7 +103,7 @@ as config dir. Otherwise, it will use a folder depending on the operating
system.
| OS | Path |
|----------|-------------------------------------------|
| --- | --- |
| Windows | ``C:\Users\<user>\AppData\Local\audible`` |
| Unix | ``~/.audible`` |
| Mac OS X | ``~/.audible`` |
@ -154,11 +147,7 @@ The APP section supports the following options:
- primary_profile: The profile to use, if no other is specified
- filename_mode: When using the `download` command, a filename mode can be
specified here. If not present, "ascii" will be used as default. To override
these option, you can provide a mode with the `--filename-mode` option of the
download command.
- chapter_type: When using the `download` command, a chapter type can be specified
here. If not present, "tree" will be used as default. To override
these option, you can provide a type with the `--chapter-type` option of the
these option, you can provide a mode with the `filename-mode` option of the
download command.
#### Profile section
@ -166,7 +155,6 @@ The APP section supports the following options:
- auth_file: The auth file for this profile
- country_code: The marketplace for this profile
- filename_mode: See APP section above. Will override the option in APP section.
- chapter_type: See APP section above. Will override the option in APP section.
## Getting started
@ -174,14 +162,6 @@ Use the `audible-quickstart` or `audible quickstart` command in your shell
to create your first config, profile and auth file. `audible-quickstart`
runs on the interactive mode, so you have to answer multiple questions to finish.
If you have used `audible quickstart` and want to add a second profile, you need to first create a new authfile and then update your config.toml file.
So the correct order is:
1. add a new auth file using your second account using `audible manage auth-file add`
2. add a new profile to your config and use the second auth file using `audible manage profile add`
## Commands
Call `audible -h` to show the help and a list of all available subcommands. You can show the help for each subcommand like so: `audible <subcommand> -h`. If a subcommand has another subcommands, you csn do it the same way.
@ -208,19 +188,6 @@ At this time, there the following buildin subcommands:
- `wishlist`
- `export`
- `list`
- `add`
- `remove`
## Example Usage
To download all of your audiobooks in the aaxc format use:
```shell
audible download --all --aaxc
```
To download all of your audiobooks after the Date 2022-07-21 in aax format use:
```shell
audible download --start-date "2022-07-21" --aax --all
```
## Verbosity option
@ -232,9 +199,9 @@ There are 6 different verbosity levels:
- error
- critical
By default, the verbosity level is set to `info`. You can provide another level like so: `audible -v <level> <subcommand> ...`.
By default the verbosity level is set to `info`. You can provide another level like so: `audible -v <level> <subcommand> ...`.
If you use the `download` subcommand with the `--all` flag there will be a huge output. Best practise is to set the verbosity level to `error` with `audible -v error download --all ...`
If you use the `download` sudcommand with the `--all` flag there will be a huge output. Best practise is to set the verbosity level to `error` with `audible -v error download --all ...`
## Plugins
@ -250,13 +217,13 @@ You can provide own subcommands and execute them with `audible SUBCOMMAND`.
All plugin commands must be placed in the plugin folder. Every subcommand must
have his own file. Every file have to be named ``cmd_{SUBCOMMAND}.py``.
Each subcommand file must have a function called `cli` as entrypoint.
This function has to be decorated with ``@click.group(name="GROUP_NAME")`` or
This function have to be decorated with ``@click.group(name="GROUP_NAME")`` or
``@click.command(name="GROUP_NAME")``.
Relative imports in the command files doesn't work. So you have to work with
absolute imports. Please take care about this. If you have any issues with
absolute imports please add your plugin path to the `PYTHONPATH` variable or
add this lines of code to the beginning of your command script:
add this lines of code to the begining of your command script:
```python
import sys
@ -272,7 +239,7 @@ Examples can be found
If you want to develop a complete plugin package for ``audible-cli`` you can
do this on an easy way. You only need to register your sub-commands or
subgroups to an entry-point in your setup.py that is loaded by the core
sub-groups to an entry-point in your setup.py that is loaded by the core
package.
Example for a setup.py

View file

@ -1,683 +0,0 @@
"""Removes encryption of aax and aaxc files.
This is a proof-of-concept and for testing purposes only.
No error handling.
Need further work. Some options do not work or options are missing.
Needs at least ffmpeg 4.4
"""
import json
import operator
import pathlib
import re
import subprocess # noqa: S404
import tempfile
import typing as t
from enum import Enum
from functools import reduce
from glob import glob
from shutil import which
import click
from click import echo, secho
from audible_cli.decorators import pass_session
from audible_cli.exceptions import AudibleCliException
class ChapterError(AudibleCliException):
"""Base class for all chapter errors."""
class SupportedFiles(Enum):
AAX = ".aax"
AAXC = ".aaxc"
@classmethod
def get_supported_list(cls):
return list(set(item.value for item in cls))
@classmethod
def is_supported_suffix(cls, value):
return value in cls.get_supported_list()
@classmethod
def is_supported_file(cls, value):
return pathlib.PurePath(value).suffix in cls.get_supported_list()
def _get_input_files(
files: t.Union[t.Tuple[str], t.List[str]],
recursive: bool = True
) -> t.List[pathlib.Path]:
filenames = []
for filename in files:
# if the shell does not do filename globbing
expanded = list(glob(filename, recursive=recursive))
if (
len(expanded) == 0
and '*' not in filename
and not SupportedFiles.is_supported_file(filename)
):
raise click.BadParameter("{filename}: file not found or supported.")
expanded_filter = filter(
lambda x: SupportedFiles.is_supported_file(x), expanded
)
expanded = list(map(lambda x: pathlib.Path(x).resolve(), expanded_filter))
filenames.extend(expanded)
return filenames
def recursive_lookup_dict(key: str, dictionary: t.Dict[str, t.Any]) -> t.Any:
if key in dictionary:
return dictionary[key]
for value in dictionary.values():
if isinstance(value, dict):
try:
item = recursive_lookup_dict(key, value)
except KeyError:
continue
else:
return item
raise KeyError
def get_aaxc_credentials(voucher_file: pathlib.Path):
if not voucher_file.exists() or not voucher_file.is_file():
raise AudibleCliException(f"Voucher file {voucher_file} not found.")
voucher_dict = json.loads(voucher_file.read_text())
try:
key = recursive_lookup_dict("key", voucher_dict)
iv = recursive_lookup_dict("iv", voucher_dict)
except KeyError:
raise AudibleCliException(f"No key/iv found in file {voucher_file}.") from None
return key, iv
class ApiChapterInfo:
def __init__(self, content_metadata: t.Dict[str, t.Any]) -> None:
chapter_info = self._parse(content_metadata)
self._chapter_info = chapter_info
@classmethod
def from_file(cls, file: t.Union[pathlib.Path, str]) -> "ApiChapterInfo":
file = pathlib.Path(file)
if not file.exists() or not file.is_file():
raise ChapterError(f"Chapter file {file} not found.")
content_string = pathlib.Path(file).read_text("utf-8")
content_json = json.loads(content_string)
return cls(content_json)
@staticmethod
def _parse(content_metadata: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]:
if "chapters" in content_metadata:
return content_metadata
try:
return recursive_lookup_dict("chapter_info", content_metadata)
except KeyError:
raise ChapterError("No chapter info found.") from None
def count_chapters(self):
return len(self.get_chapters())
def get_chapters(self, separate_intro_outro=False, remove_intro_outro=False):
def extract_chapters(initial, current):
if "chapters" in current:
return initial + [current] + current["chapters"]
else:
return initial + [current]
chapters = list(
reduce(
extract_chapters,
self._chapter_info["chapters"],
[],
)
)
if separate_intro_outro:
return self._separate_intro_outro(chapters)
elif remove_intro_outro:
return self._remove_intro_outro(chapters)
return chapters
def get_intro_duration_ms(self):
return self._chapter_info["brandIntroDurationMs"]
def get_outro_duration_ms(self):
return self._chapter_info["brandOutroDurationMs"]
def get_runtime_length_ms(self):
return self._chapter_info["runtime_length_ms"]
def is_accurate(self):
return self._chapter_info["is_accurate"]
def _separate_intro_outro(self, chapters):
echo("Separate Audible Brand Intro and Outro to own Chapter.")
chapters.sort(key=operator.itemgetter("start_offset_ms"))
first = chapters[0]
intro_dur_ms = self.get_intro_duration_ms()
first["start_offset_ms"] = intro_dur_ms
first["start_offset_sec"] = round(first["start_offset_ms"] / 1000)
first["length_ms"] -= intro_dur_ms
last = chapters[-1]
outro_dur_ms = self.get_outro_duration_ms()
last["length_ms"] -= outro_dur_ms
chapters.append(
{
"length_ms": intro_dur_ms,
"start_offset_ms": 0,
"start_offset_sec": 0,
"title": "Intro",
}
)
chapters.append(
{
"length_ms": outro_dur_ms,
"start_offset_ms": self.get_runtime_length_ms() - outro_dur_ms,
"start_offset_sec": round(
(self.get_runtime_length_ms() - outro_dur_ms) / 1000
),
"title": "Outro",
}
)
chapters.sort(key=operator.itemgetter("start_offset_ms"))
return chapters
def _remove_intro_outro(self, chapters):
echo("Delete Audible Brand Intro and Outro.")
chapters.sort(key=operator.itemgetter("start_offset_ms"))
intro_dur_ms = self.get_intro_duration_ms()
outro_dur_ms = self.get_outro_duration_ms()
first = chapters[0]
first["length_ms"] -= intro_dur_ms
for chapter in chapters[1:]:
chapter["start_offset_ms"] -= intro_dur_ms
chapter["start_offset_sec"] -= round(chapter["start_offset_ms"] / 1000)
last = chapters[-1]
last["length_ms"] -= outro_dur_ms
return chapters
class FFMeta:
SECTION = re.compile(r"\[(?P<header>[^]]+)\]")
OPTION = re.compile(r"(?P<option>.*?)\s*(?:(?P<vi>=)\s*(?P<value>.*))?$")
def __init__(self, ffmeta_file: t.Union[str, pathlib.Path]) -> None:
self._ffmeta_raw = pathlib.Path(ffmeta_file).read_text("utf-8")
self._ffmeta_parsed = self._parse_ffmeta()
def _parse_ffmeta(self):
parsed_dict = {}
start_section = "_"
cursec = parsed_dict[start_section] = {}
num_chap = 0
for line in iter(self._ffmeta_raw.splitlines()):
mo = self.SECTION.match(line)
if mo:
sec_name = mo.group("header")
if sec_name == "CHAPTER":
num_chap += 1
if sec_name not in parsed_dict:
parsed_dict[sec_name] = {}
cursec = parsed_dict[sec_name][num_chap] = {}
else:
cursec = parsed_dict[sec_name] = {}
else:
match = self.OPTION.match(line)
cursec.update({match.group("option"): match.group("value")})
return parsed_dict
def count_chapters(self):
return len(self._ffmeta_parsed["CHAPTER"])
def set_chapter_option(self, num, option, value):
chapter = self._ffmeta_parsed["CHAPTER"][num]
for chapter_option in chapter:
if chapter_option == option:
chapter[chapter_option] = value
def write(self, filename):
fp = pathlib.Path(filename).open("w", encoding="utf-8")
d = "="
for section in self._ffmeta_parsed:
if section == "_":
self._write_section(fp, None, self._ffmeta_parsed[section], d)
elif section == "CHAPTER":
# TODO: Tue etwas
for chapter in self._ffmeta_parsed[section]:
self._write_section(
fp, section, self._ffmeta_parsed[section][chapter], d
)
else:
self._write_section(fp, section, self._ffmeta_parsed[section], d)
@staticmethod
def _write_section(fp, section_name, section_items, delimiter):
"""Write a single section to the specified `fp`."""
if section_name is not None:
fp.write(f"[{section_name}]\n")
for key, value in section_items.items():
if value is None:
fp.write(f"{key}\n")
else:
fp.write(f"{key}{delimiter}{value}\n")
def update_chapters_from_chapter_info(
self,
chapter_info: ApiChapterInfo,
force_rebuild_chapters: bool = False,
separate_intro_outro: bool = False,
remove_intro_outro: bool = False
) -> None:
if not chapter_info.is_accurate():
echo("Metadata from API is not accurate. Skip.")
return
if chapter_info.count_chapters() != self.count_chapters():
if force_rebuild_chapters:
echo("Force rebuild chapters due to chapter mismatch.")
else:
raise ChapterError("Chapter mismatch")
echo(f"Found {chapter_info.count_chapters()} chapters to prepare.")
api_chapters = chapter_info.get_chapters(separate_intro_outro, remove_intro_outro)
num_chap = 0
new_chapters = {}
for chapter in api_chapters:
chap_start = chapter["start_offset_ms"]
chap_end = chap_start + chapter["length_ms"]
num_chap += 1
new_chapters[num_chap] = {
"TIMEBASE": "1/1000",
"START": chap_start,
"END": chap_end,
"title": chapter["title"],
}
self._ffmeta_parsed["CHAPTER"] = new_chapters
def get_start_end_without_intro_outro(
self,
chapter_info: ApiChapterInfo,
):
intro_dur_ms = chapter_info.get_intro_duration_ms()
outro_dur_ms = chapter_info.get_outro_duration_ms()
total_runtime_ms = chapter_info.get_runtime_length_ms()
start_new = intro_dur_ms
duration_new = total_runtime_ms - intro_dur_ms - outro_dur_ms
return start_new, duration_new
def _get_voucher_filename(file: pathlib.Path) -> pathlib.Path:
return file.with_suffix(".voucher")
def _get_chapter_filename(file: pathlib.Path) -> pathlib.Path:
base_filename = file.stem.rsplit("-", 1)[0]
return file.with_name(base_filename + "-chapters.json")
def _get_ffmeta_file(file: pathlib.Path, tempdir: pathlib.Path) -> pathlib.Path:
metaname = file.with_suffix(".meta").name
metafile = tempdir / metaname
return metafile
class FfmpegFileDecrypter:
def __init__(
self,
file: pathlib.Path,
target_dir: pathlib.Path,
tempdir: pathlib.Path,
activation_bytes: t.Optional[str],
overwrite: bool,
rebuild_chapters: bool,
force_rebuild_chapters: bool,
skip_rebuild_chapters: bool,
separate_intro_outro: bool,
remove_intro_outro: bool
) -> None:
file_type = SupportedFiles(file.suffix)
credentials = None
if file_type == SupportedFiles.AAX:
if activation_bytes is None:
raise AudibleCliException(
"No activation bytes found. Do you ever run "
"`audible activation-bytes`?"
)
credentials = activation_bytes
elif file_type == SupportedFiles.AAXC:
voucher_filename = _get_voucher_filename(file)
credentials = get_aaxc_credentials(voucher_filename)
self._source = file
self._credentials: t.Optional[t.Union[str, t.Tuple[str]]] = credentials
self._target_dir = target_dir
self._tempdir = tempdir
self._overwrite = overwrite
self._rebuild_chapters = rebuild_chapters
self._force_rebuild_chapters = force_rebuild_chapters
self._skip_rebuild_chapters = skip_rebuild_chapters
self._separate_intro_outro = separate_intro_outro
self._remove_intro_outro = remove_intro_outro
self._api_chapter: t.Optional[ApiChapterInfo] = None
self._ffmeta: t.Optional[FFMeta] = None
self._is_rebuilded: bool = False
@property
def api_chapter(self) -> ApiChapterInfo:
if self._api_chapter is None:
try:
voucher_filename = _get_voucher_filename(self._source)
self._api_chapter = ApiChapterInfo.from_file(voucher_filename)
except ChapterError:
voucher_filename = _get_chapter_filename(self._source)
self._api_chapter = ApiChapterInfo.from_file(voucher_filename)
echo(f"Using chapters from {voucher_filename}")
return self._api_chapter
@property
def ffmeta(self) -> FFMeta:
if self._ffmeta is None:
metafile = _get_ffmeta_file(self._source, self._tempdir)
base_cmd = [
"ffmpeg",
"-v",
"quiet",
"-stats",
]
if isinstance(self._credentials, tuple):
key, iv = self._credentials
credentials_cmd = [
"-audible_key",
key,
"-audible_iv",
iv,
]
else:
credentials_cmd = [
"-activation_bytes",
self._credentials,
]
base_cmd.extend(credentials_cmd)
extract_cmd = [
"-i",
str(self._source),
"-f",
"ffmetadata",
str(metafile),
]
base_cmd.extend(extract_cmd)
subprocess.check_output(base_cmd, text=True) # noqa: S603
self._ffmeta = FFMeta(metafile)
return self._ffmeta
def rebuild_chapters(self) -> None:
if not self._is_rebuilded:
self.ffmeta.update_chapters_from_chapter_info(
self.api_chapter, self._force_rebuild_chapters, self._separate_intro_outro, self._remove_intro_outro
)
self._is_rebuilded = True
def run(self):
oname = self._source.with_suffix(".m4b").name
outfile = self._target_dir / oname
if outfile.exists():
if self._overwrite:
secho(f"Overwrite {outfile}: already exists", fg="blue")
else:
secho(f"Skip {outfile}: already exists", fg="blue")
return
base_cmd = [
"ffmpeg",
"-v",
"quiet",
"-stats",
]
if self._overwrite:
base_cmd.append("-y")
if isinstance(self._credentials, tuple):
key, iv = self._credentials
credentials_cmd = [
"-audible_key",
key,
"-audible_iv",
iv,
]
else:
credentials_cmd = [
"-activation_bytes",
self._credentials,
]
base_cmd.extend(credentials_cmd)
if self._rebuild_chapters:
metafile = _get_ffmeta_file(self._source, self._tempdir)
try:
self.rebuild_chapters()
self.ffmeta.write(metafile)
except ChapterError:
if self._skip_rebuild_chapters:
echo("Skip rebuild chapters due to chapter mismatch.")
else:
raise
else:
if self._remove_intro_outro:
start_new, duration_new = self.ffmeta.get_start_end_without_intro_outro(self.api_chapter)
base_cmd.extend(
[
"-ss",
f"{start_new}ms",
"-t",
f"{duration_new}ms",
"-i",
str(self._source),
"-i",
str(metafile),
"-map_metadata",
"0",
"-map_chapters",
"1",
]
)
else:
base_cmd.extend(
[
"-i",
str(self._source),
"-i",
str(metafile),
"-map_metadata",
"0",
"-map_chapters",
"1",
]
)
else:
base_cmd.extend(
[
"-i",
str(self._source),
]
)
base_cmd.extend(
[
"-c",
"copy",
str(outfile),
]
)
subprocess.check_output(base_cmd, text=True) # noqa: S603
echo(f"File decryption successful: {outfile}")
@click.command("decrypt")
@click.argument("files", nargs=-1)
@click.option(
"--dir",
"-d",
"directory",
type=click.Path(exists=True, dir_okay=True),
default=pathlib.Path.cwd(),
help="Folder where the decrypted files should be saved.",
show_default=True
)
@click.option(
"--all",
"-a",
"all_",
is_flag=True,
help="Decrypt all aax and aaxc files in current folder."
)
@click.option("--overwrite", is_flag=True, help="Overwrite existing files.")
@click.option(
"--rebuild-chapters",
"-r",
is_flag=True,
help="Rebuild chapters with chapters from voucher or chapter file."
)
@click.option(
"--force-rebuild-chapters",
"-f",
is_flag=True,
help=(
"Force rebuild chapters with chapters from voucher or chapter file "
"if the built-in chapters in the audio file mismatch. "
"Only use with `--rebuild-chapters`."
),
)
@click.option(
"--skip-rebuild-chapters",
"-t",
is_flag=True,
help=(
"Decrypt without rebuilding chapters when chapters mismatch. "
"Only use with `--rebuild-chapters`."
),
)
@click.option(
"--separate-intro-outro",
"-s",
is_flag=True,
help=(
"Separate Audible Brand Intro and Outro to own Chapter. "
"Only use with `--rebuild-chapters`."
),
)
@click.option(
"--remove-intro-outro",
"-c",
is_flag=True,
help=(
"Remove Audible Brand Intro and Outro. "
"Only use with `--rebuild-chapters`."
),
)
@pass_session
def cli(
session,
files: str,
directory: t.Union[pathlib.Path, str],
all_: bool,
overwrite: bool,
rebuild_chapters: bool,
force_rebuild_chapters: bool,
skip_rebuild_chapters: bool,
separate_intro_outro: bool,
remove_intro_outro: bool,
):
"""Decrypt audiobooks downloaded with audible-cli.
FILES are the names of the file to decrypt.
Wildcards `*` and recursive lookup with `**` are supported.
Only FILES with `aax` or `aaxc` suffix are processed.
Other files are skipped silently.
"""
if not which("ffmpeg"):
ctx = click.get_current_context()
ctx.fail("ffmpeg not found")
if (force_rebuild_chapters or skip_rebuild_chapters or separate_intro_outro or remove_intro_outro) and not rebuild_chapters:
raise click.BadOptionUsage(
"",
"`--force-rebuild-chapters`, `--skip-rebuild-chapters`, `--separate-intro-outro` "
"and `--remove-intro-outro` can only be used together with `--rebuild-chapters`"
)
if force_rebuild_chapters and skip_rebuild_chapters:
raise click.BadOptionUsage(
"",
"`--force-rebuild-chapters` and `--skip-rebuild-chapters` can "
"not be used together"
)
if separate_intro_outro and remove_intro_outro:
raise click.BadOptionUsage(
"",
"`--separate-intro-outro` and `--remove-intro-outro` can not be used together"
)
if all_:
if files:
raise click.BadOptionUsage(
"",
"If using `--all`, no FILES arguments can be used."
)
files = [f"*{suffix}" for suffix in SupportedFiles.get_supported_list()]
files = _get_input_files(files, recursive=True)
with tempfile.TemporaryDirectory() as tempdir:
for file in files:
decrypter = FfmpegFileDecrypter(
file=file,
target_dir=pathlib.Path(directory).resolve(),
tempdir=pathlib.Path(tempdir).resolve(),
activation_bytes=session.auth.activation_bytes,
overwrite=overwrite,
rebuild_chapters=rebuild_chapters,
force_rebuild_chapters=force_rebuild_chapters,
skip_rebuild_chapters=skip_rebuild_chapters,
separate_intro_outro=separate_intro_outro,
remove_intro_outro=remove_intro_outro
)
decrypter.run()

View file

@ -1,21 +0,0 @@
import click
from audible.exceptions import NotFoundError
from audible_cli.decorators import pass_client
@click.command("get-annotations")
@click.argument("asin")
@pass_client
async def cli(client, asin):
url = f"https://cde-ta-g7g.amazon.com/FionaCDEServiceEngine/sidecar"
params = {
"type": "AUDI",
"key": asin
}
try:
r = await client.get(url, params=params)
except NotFoundError:
click.echo(f"No annotations found for asin {asin}")
else:
click.echo(r)

View file

@ -1,110 +0,0 @@
import logging
import pathlib
from datetime import datetime, timezone
import click
from audible_cli.decorators import (
bunch_size_option,
timeout_option,
pass_client,
pass_session
)
from audible_cli.models import Library
from audible_cli.utils import export_to_csv
from isbntools.app import isbn_from_words
logger = logging.getLogger("audible_cli.cmds.cmd_goodreads-transform")
@click.command("goodreads-transform")
@click.option(
"--output", "-o",
type=click.Path(path_type=pathlib.Path),
default=pathlib.Path().cwd() / "library.csv",
show_default=True,
help="output file"
)
@timeout_option
@bunch_size_option
@pass_session
@pass_client
async def cli(session, client, output):
"""YOUR COMMAND DESCRIPTION"""
logger.debug("fetching library")
bunch_size = session.params.get("bunch_size")
library = await Library.from_api_full_sync(
client,
response_groups=(
"product_details, contributors, is_finished, product_desc"
),
bunch_size=bunch_size
)
logger.debug("prepare library")
library = _prepare_library_for_export(library)
logger.debug("write data rows to file")
headers = ("isbn", "Date Added", "Date Read", "Title")
export_to_csv(
file=output,
data=library,
headers=headers,
dialect="excel"
)
logger.info(f"File saved to {output}")
def _prepare_library_for_export(library):
prepared_library = []
isbn_counter = 0
isbn_api_counter = 0
isbn_no_result_counter = 0
skipped_items = 0
for i in library:
title = i.title
authors = i.authors
if authors is not None:
authors = ", ".join([a["name"] for a in authors])
is_finished = i.is_finished
isbn = i.isbn
if isbn is None:
isbn_counter += 1
isbn = isbn_from_words(f"{title} {authors}") or None
if isbn is None:
isbn_no_result_counter += 1
else:
isbn_api_counter += 1
date_added = i.library_status
if date_added is not None:
date_added = date_added["date_added"]
date_added = datetime.strptime(
date_added, '%Y-%m-%dT%H:%M:%S.%fZ'
).replace(tzinfo=timezone.utc).astimezone()
date_added = date_added.astimezone().date().isoformat()
date_read = None
if is_finished:
date_read = date_added
if isbn and date_read:
data_row = [isbn, date_added, date_read, title]
prepared_library.append(data_row)
else:
skipped_items += 1
logger.debug(f"ISBNs from API: {isbn_api_counter}")
logger.debug(f"ISBNs requested with isbntools: {isbn_counter}")
logger.debug(f"No result with isbntools: {isbn_no_result_counter}")
logger.debug(
f"title skipped from file due to no ISBN or title not read: "
f"{skipped_items}")
return prepared_library

View file

@ -1,19 +1,25 @@
import audible
import click
from audible_cli.decorators import pass_client, timeout_option
from audible_cli.config import pass_session
@click.command("image-urls")
@click.argument("asin")
@timeout_option()
@pass_client()
async def cli(client, asin):
"""Print out the image urls for different resolutions for a book"""
r = await client.get(
f"catalog/products/{asin}",
response_groups="media",
image_sizes=(
"1215, 408, 360, 882, 315, 570, 252, 558, 900, 500")
)
@click.command("get-cover-urls")
@click.option(
"--asin", "-a",
multiple=False,
help="asin of the audiobook"
)
@pass_session
def cli(session, asin):
"Print out the image urls for different resolutions for a book"
with audible.Client(auth=session.auth) as client:
r = client.get(
f"catalog/products/{asin}",
response_groups="media",
image_sizes=("1215, 408, 360, 882, 315, 570, 252, "
"558, 900, 500")
)
images = r["product"]["product_images"]
for res, url in images.items():
click.echo(f"Resolution {res}: {url}")

View file

@ -1,67 +0,0 @@
import asyncio
import json
import logging
import pathlib
from datetime import datetime
import click
from audible_cli.decorators import pass_client
logger = logging.getLogger("audible_cli.cmds.cmd_listening-stats")
current_year = datetime.now().year
def ms_to_hms(milliseconds):
seconds = int((milliseconds / 1000) % 60)
minutes = int(((milliseconds / (1000*60)) % 60))
hours = int(((milliseconds / (1000*60*60)) % 24))
return {"hours": hours, "minutes": minutes, "seconds": seconds}
async def _get_stats_year(client, year):
stats_year = {}
stats = await client.get(
"stats/aggregates",
monthly_listening_interval_duration="12",
monthly_listening_interval_start_date=f"{year}-01",
store="Audible"
)
# iterate over each month
for stat in stats['aggregated_monthly_listening_stats']:
stats_year[stat["interval_identifier"]] = ms_to_hms(stat["aggregated_sum"])
return stats_year
@click.command("listening-stats")
@click.option(
"--output", "-o",
type=click.Path(path_type=pathlib.Path),
default=pathlib.Path().cwd() / "listening-stats.json",
show_default=True,
help="output file"
)
@click.option(
"--signup-year", "-s",
type=click.IntRange(1997, current_year),
default="2010",
show_default=True,
help="start year for collecting listening stats"
)
@pass_client
async def cli(client, output, signup_year):
"""get and analyse listening statistics"""
year_range = [y for y in range(signup_year, current_year+1)]
r = await asyncio.gather(
*[_get_stats_year(client, y) for y in year_range]
)
aggregated_stats = {}
for i in r:
for k, v in i.items():
aggregated_stats[k] = v
aggregated_stats = json.dumps(aggregated_stats, indent=4)
output.write_text(aggregated_stats)

View file

@ -0,0 +1,301 @@
"""
This is a proof-of-concept and for testing purposes only. No error handling.
Need further work. Some options does not work or options are missing.
Needs at least ffmpeg 4.1 with aaxc patch.
"""
import json
import operator
import pathlib
import re
import subprocess
from shutil import which
import click
from audible_cli.config import pass_session
from click import echo, secho
class ApiMeta:
def __init__(self, api_meta):
if not isinstance(api_meta, dict):
api_meta = pathlib.Path(api_meta).read_text("utf-8")
self._meta_raw = api_meta
self._meta_parsed = self._parse_meta()
def _parse_meta(self):
if isinstance(self._meta_raw, dict):
return self._meta_raw
return json.loads(self._meta_raw)
def count_chapters(self):
return len(self.get_chapters())
def get_chapters(self):
return self._meta_parsed["content_metadata"]["chapter_info"][
"chapters"]
def get_intro_duration_ms(self):
return self._meta_parsed["content_metadata"]["chapter_info"][
"brandIntroDurationMs"]
def get_outro_duration_ms(self):
return self._meta_parsed["content_metadata"]["chapter_info"][
"brandOutroDurationMs"]
def get_runtime_length_ms(self):
return self._meta_parsed["content_metadata"]["chapter_info"][
"runtime_length_ms"]
class FFMeta:
SECTION = re.compile(r"\[(?P<header>[^]]+)\]")
OPTION = re.compile(r"(?P<option>.*?)\s*(?:(?P<vi>=)\s*(?P<value>.*))?$")
def __init__(self, ffmeta_file):
self._ffmeta_raw = pathlib.Path(ffmeta_file).read_text("utf-8")
self._ffmeta_parsed = self._parse_ffmeta()
def _parse_ffmeta(self):
parsed_dict = {}
start_section = "_"
cursec = parsed_dict[start_section] = {}
num_chap = 0
for line in iter(self._ffmeta_raw.splitlines()):
mo = self.SECTION.match(line)
if mo:
sec_name = mo.group("header")
if sec_name == "CHAPTER":
num_chap += 1
if sec_name not in parsed_dict:
parsed_dict[sec_name] = {}
cursec = parsed_dict[sec_name][num_chap] = {}
else:
cursec = parsed_dict[sec_name] = {}
else:
match = self.OPTION.match(line)
cursec.update({match.group("option"): match.group("value")})
return parsed_dict
def count_chapters(self):
return len(self._ffmeta_parsed["CHAPTER"])
def set_chapter_option(self, num, option, value):
chapter = self._ffmeta_parsed["CHAPTER"][num]
for chapter_option in chapter:
if chapter_option == option:
chapter[chapter_option] = value
def write(self, filename):
fp = pathlib.Path(filename).open("w", encoding="utf-8")
d = "="
for section in self._ffmeta_parsed:
if section == "_":
self._write_section(fp, None, self._ffmeta_parsed[section], d)
elif section == "CHAPTER":
# TODO: Tue etwas
for chapter in self._ffmeta_parsed[section]:
self._write_section(fp, section,
self._ffmeta_parsed[section][chapter],
d)
else:
self._write_section(fp, section, self._ffmeta_parsed[section],
d)
def _write_section(self, fp, section_name, section_items, delimiter):
"""Write a single section to the specified `fp`."""
if section_name is not None:
fp.write(f"[{section_name}]\n")
for key, value in section_items.items():
if value is None:
fp.write(f"{key}\n")
else:
fp.write(f"{key}{delimiter}{value}\n")
def update_chapters_from_api_meta(self, api_meta, separate_intro_outro=True):
if not isinstance(api_meta, ApiMeta):
api_meta = ApiMeta(api_meta)
# assert api_meta.count_chapters() == self.count_chapters()
echo(f"Found {self.count_chapters()} chapters to prepare.")
api_chapters = api_meta.get_chapters()
if separate_intro_outro:
echo("Separate Audible Brand Intro and Outro to own Chapter.")
api_chapters.sort(key=operator.itemgetter("start_offset_ms"))
first = api_chapters[0]
intro_dur_ms = api_meta.get_intro_duration_ms()
first["start_offset_ms"] = intro_dur_ms
first["start_offset_sec"] = round(first["start_offset_ms"] / 1000)
first["length_ms"] -= intro_dur_ms
last = api_chapters[-1]
outro_dur_ms = api_meta.get_outro_duration_ms()
last["length_ms"] -= outro_dur_ms
api_chapters.append({
"length_ms": intro_dur_ms,
"start_offset_ms": 0,
"start_offset_sec": 0,
"title": "Intro"
})
api_chapters.append({
"length_ms": outro_dur_ms,
"start_offset_ms": api_meta.get_runtime_length_ms() - outro_dur_ms,
"start_offset_sec": round((api_meta.get_runtime_length_ms() - outro_dur_ms) / 1000),
"title": "Outro"
})
api_chapters.sort(key=operator.itemgetter("start_offset_ms"))
num_chap = 0
new_chapters = {}
for chapter in api_chapters:
chap_start = chapter["start_offset_ms"]
chap_end = chap_start + chapter["length_ms"]
num_chap += 1
new_chapters[num_chap] = {
"TIMEBASE": "1/1000",
"START": chap_start,
"END": chap_end,
"title": chapter["title"]
}
self._ffmeta_parsed["CHAPTER"] = new_chapters
def decrypt_aax(files, session):
for file in files:
outfile = file.with_suffix(".m4b")
metafile = file.with_suffix(".meta")
metafile_new = file.with_suffix(".new.meta")
# apimeta = CHAPTERFILE
if outfile.exists():
secho(f"file {outfile} already exists Skip.", fg="blue")
continue
ab = session.auth.activation_bytes
cmd = ["ffmpeg",
"-activation_bytes", ab,
"-i", str(file),
"-f", "ffmetadata",
str(metafile)]
subprocess.check_output(cmd, universal_newlines=True)
ffmeta_class = FFMeta(metafile)
#ffmeta_class.update_chapters_from_api_meta(apimeta)
ffmeta_class.write(metafile_new)
click.echo("Replaced all titles.")
cmd = ["ffmpeg",
"-activation_bytes", ab,
"-i", str(file),
"-i", str(metafile_new),
"-map_metadata", "0",
"-map_chapters", "1",
"-c", "copy",
str(outfile)]
subprocess.check_output(cmd, universal_newlines=True)
metafile.unlink()
metafile_new.unlink()
def decrypt_aaxc(files, session):
for file in files:
metafile = file.with_suffix(".meta")
metafile_new = file.with_suffix(".new.meta")
voucher = file.with_suffix(".voucher")
voucher = json.loads(voucher.read_text())
outfile = file.with_suffix(".m4b")
if outfile.exists():
secho(f"file {outfile} already exists Skip.", fg="blue")
continue
apimeta = voucher["content_license"]
audible_key = apimeta["license_response"]["key"]
audible_iv = apimeta["license_response"]["iv"]
cmd = ["ffmpeg",
"-audible_key", audible_key,
"-audible_iv", audible_iv,
"-i", str(file),
"-f", "ffmetadata",
str(metafile)]
subprocess.check_output(cmd, universal_newlines=True)
ffmeta_class = FFMeta(metafile)
ffmeta_class.update_chapters_from_api_meta(apimeta)
ffmeta_class.write(metafile_new)
click.echo("Replaced all titles.")
cmd = ["ffmpeg",
"-audible_key", audible_key,
"-audible_iv", audible_iv,
"-i", str(file),
"-i", str(metafile_new),
"-map_metadata", "0",
"-map_chapters", "1",
"-c", "copy",
str(outfile)]
subprocess.check_output(cmd, universal_newlines=True)
metafile.unlink()
metafile_new.unlink()
CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"])
@click.command("remove-encryption", context_settings=CONTEXT_SETTINGS)
@click.option(
"--input", "-i",
type=click.Path(exists=True, file_okay=True),
multiple=True,
help="Input file")
@click.option(
"--all",
is_flag=True,
help="convert all files in folder"
)
@click.option(
"--overwrite",
is_flag=True,
help="overwrite existing files"
)
@pass_session
def cli(session, **options):
if not which("ffmpeg"):
ctx = click.get_current_context()
ctx.fail("ffmpeg not found")
jobs = {"aaxc": [], "aax":[]}
if options.get("all"):
cwd = pathlib.Path.cwd()
jobs["aaxc"].extend(list(cwd.glob('*.aaxc')))
jobs["aax"].extend(list(cwd.glob('*.aax')))
for suffix in jobs:
for i in jobs[suffix]:
i = i.resolve()
else:
for file in options.get("input"):
file = pathlib.Path(file).resolve()
if file.match("*.aaxc"):
jobs["aaxc"].append(file)
elif file.match("*.aax"):
jobs["aax"].append(file)
else:
secho(f"file suffix {file.suffix} not supported", fg="red")
decrypt_aaxc(jobs["aaxc"], session)
decrypt_aax(jobs["aax"], session)

View file

@ -1,9 +1,4 @@
import multiprocessing
from audible_cli import cli
multiprocessing.freeze_support()
if __name__ == '__main__':
from audible_cli import cli
cli.main()
cli.main()

View file

@ -46,17 +46,15 @@ setup(
],
install_requires=[
"aiofiles",
"audible>=0.8.2",
"audible==0.7.2",
"click>=8",
"colorama; platform_system=='Windows'",
"httpx>=0.23.3,<0.28.0",
"httpx==0.20.*",
"packaging",
"Pillow",
"tabulate",
"toml",
"tqdm",
"questionary",
"importlib-metadata; python_version<'3.10'",
"tqdm"
],
extras_require={
'pyi': [

View file

@ -4,7 +4,6 @@ from typing import Optional, Union
from warnings import warn
import click
from tqdm import tqdm
audible_cli_logger = logging.getLogger("audible_cli")
@ -74,6 +73,42 @@ log_helper = AudibleCliLogHelper()
# copied from https://github.com/Toilal/click-logging
def click_verbosity_option(logger=None, *names, **kwargs):
"""A decorator that adds a `--verbosity, -v` option to the decorated
command.
Name can be configured through ``*names``. Keyword arguments are passed to
the underlying ``click.option`` decorator.
"""
if not names:
names = ["--verbosity", "-v"]
kwargs.setdefault("default", "INFO")
kwargs.setdefault("metavar", "LVL")
kwargs.setdefault("expose_value", False)
kwargs.setdefault(
"help", "Either CRITICAL, ERROR, WARNING, "
"INFO or DEBUG. [default: INFO]"
)
kwargs.setdefault("is_eager", True)
logger = _normalize_logger(logger)
def decorator(f):
def _set_level(ctx, param, value):
x = getattr(logging, value.upper(), None)
if x is None:
raise click.BadParameter(
f"Must be CRITICAL, ERROR, WARNING, INFO or DEBUG, "
f"not {value}"
)
logger.setLevel(x)
return click.option(*names, callback=_set_level, **kwargs)(f)
return decorator
class ColorFormatter(logging.Formatter):
def __init__(self, style_kwargs):
self.style_kwargs = style_kwargs
@ -101,13 +136,10 @@ class ClickHandler(logging.Handler):
try:
msg = self.format(record)
level = record.levelname.lower()
# Avoid tqdm progress bar interruption by logger's output to console
with tqdm.external_write_mode():
if self.echo_kwargs.get(level):
click.echo(msg, **self.echo_kwargs[level])
else:
click.echo(msg)
if self.echo_kwargs.get(level):
click.echo(msg, **self.echo_kwargs[level])
else:
click.echo(msg)
except Exception:
self.handleError(record)

View file

@ -1,7 +1,7 @@
__title__ = "audible-cli"
__description__ = "Command line interface (cli) for the audible package."
__url__ = "https://github.com/mkb79/audible-cli"
__version__ = "0.3.2b3"
__version__ = "0.1.3"
__author__ = "mkb79"
__author_email__ = "mkb79@hackitall.de"
__license__ = "AGPL"

View file

@ -1,26 +1,20 @@
import asyncio
import logging
import sys
from pkg_resources import iter_entry_points
import click
import httpx
from packaging.version import parse
from .cmds import build_in_cmds, cmd_quickstart
from .config import get_plugin_dir
from .constants import PLUGIN_ENTRY_POINT
from .decorators import (
password_option,
profile_option,
verbosity_option,
version_option
from .config import (
get_plugin_dir,
add_param_to_session
)
from .constants import PLUGIN_ENTRY_POINT
from .exceptions import AudibleCliException
from ._logging import click_basic_config
from . import plugins
if sys.version_info >= (3, 10):
from importlib.metadata import entry_points
else: # Python < 3.10 (backport)
from importlib_metadata import entry_points
from ._logging import click_basic_config, click_verbosity_option
from . import __version__, plugins
logger = logging.getLogger("audible_cli")
@ -29,22 +23,77 @@ click_basic_config(logger)
CONTEXT_SETTINGS = dict(help_option_names=["-h", "--help"])
def version_option(**kwargs):
def callback(ctx, param, value):
if not value or ctx.resilient_parsing:
return
message = f"audible-cli, version {__version__}"
click.echo(message, color=ctx.color, nl=False)
url = "https://api.github.com/repos/mkb79/audible-cli/releases/latest"
headers = {"Accept": "application/vnd.github.v3+json"}
logger.debug(f"Requesting Github API for latest release information")
try:
response = httpx.get(url, headers=headers, follow_redirects=True)
response.raise_for_status()
except Exception as e:
logger.error(e)
click.Abort()
content = response.json()
current_version = parse(__version__)
latest_version = parse(content["tag_name"])
html_url = content["html_url"]
if latest_version > current_version:
click.echo(
f" (update available)\nVisit {html_url} "
f"for information about the new release.",
color=ctx.color
)
else:
click.echo(" (up-to-date)", color=ctx.color)
ctx.exit()
kwargs.setdefault("is_flag", True)
kwargs.setdefault("expose_value", False)
kwargs.setdefault("is_eager", True)
kwargs.setdefault("help", "Show the version and exit.")
kwargs["callback"] = callback
return click.option("--version", **kwargs)
@plugins.from_folder(get_plugin_dir())
@plugins.from_entry_point(entry_points(group=PLUGIN_ENTRY_POINT))
@build_in_cmds
@plugins.from_entry_point(iter_entry_points(PLUGIN_ENTRY_POINT))
@build_in_cmds()
@click.group(context_settings=CONTEXT_SETTINGS)
@profile_option
@password_option
@version_option
@verbosity_option(cli_logger=logger)
@click.option(
"--profile",
"-P",
callback=add_param_to_session,
expose_value=False,
help="The profile to use instead primary profile (case sensitive!)."
)
@click.option(
"--password",
"-p",
callback=add_param_to_session,
expose_value=False,
help="The password for the profile auth file."
)
@version_option()
@click_verbosity_option(logger)
def cli():
"""Entrypoint for all other subcommands and groups."""
@click.command(context_settings=CONTEXT_SETTINGS)
@click.pass_context
@version_option
@verbosity_option(cli_logger=logger)
@version_option()
@click_verbosity_option(logger)
def quickstart(ctx):
"""Entrypoint for the quickstart command"""
try:
@ -66,9 +115,6 @@ def main(*args, **kwargs):
except click.Abort:
logger.error("Aborted")
sys.exit(1)
except asyncio.CancelledError:
logger.error("Aborted with Asyncio CancelledError")
sys.exit(2)
except AudibleCliException as e:
logger.error(e)
sys.exit(2)

View file

@ -21,7 +21,7 @@ cli_cmds = [
]
def build_in_cmds(func=None):
def build_in_cmds():
"""
A decorator to register build-in CLI commands to an instance of
`click.Group()`.
@ -42,7 +42,4 @@ def build_in_cmds(func=None):
return group
if callable(func):
return decorator(func)
return decorator

View file

@ -6,7 +6,7 @@ from audible.activation_bytes import (
fetch_activation_sign_auth
)
from ..decorators import pass_session
from ..config import pass_session
logger = logging.getLogger("audible_cli.cmds.cmd_activation_bytes")

View file

@ -6,8 +6,7 @@ import sys
import click
from audible import Client
from ..constants import AVAILABLE_MARKETPLACES
from ..decorators import pass_session
from ..config import pass_session
logger = logging.getLogger("audible_cli.cmds.cmd_api")
@ -55,14 +54,16 @@ logger = logging.getLogger("audible_cli.cmds.cmd_api")
)
@click.option(
"--country-code", "-c",
type=click.Choice(AVAILABLE_MARKETPLACES),
type=click.Choice(
["us", "ca", "uk", "au", "fr", "de", "es", "jp", "it", "in"]
),
help="Requested Audible marketplace. If not set, the country code for "
"the current profile is used."
)
@pass_session
def cli(session, **options):
"""Send requests to an Audible API endpoint
Take a look at
https://audible.readthedocs.io/en/latest/misc/external_api.html for known
endpoints and parameters.
@ -95,7 +96,7 @@ def cli(session, **options):
with Client(auth=auth, country_code=country_code) as client:
r = client._request(method, endpoint, params=params, json=body)
except Exception as e:
logger.error(e)
logger.error(e)
sys.exit(1)
if output_format == "json":

File diff suppressed because it is too large Load diff

View file

@ -1,21 +1,15 @@
import asyncio
import csv
import json
import pathlib
from typing import Union
import audible
import click
from click import echo
from ..decorators import (
bunch_size_option,
end_date_option,
start_date_option,
timeout_option,
pass_client,
pass_session,
wrap_async
)
from ..config import pass_session
from ..models import Library
from ..utils import export_to_csv
@click.group("library")
@ -23,64 +17,65 @@ def cli():
"""interact with library"""
async def _get_library(session, client, resolve_podcasts):
bunch_size = session.params.get("bunch_size")
start_date = session.params.get("start_date")
end_date = session.params.get("end_date")
async def _get_library(auth, **params):
timeout = params.get("timeout")
if timeout == 0:
timeout = None
library = await Library.from_api_full_sync(
client,
response_groups=(
"contributors, media, price, product_attrs, product_desc, "
"product_extended_attrs, product_plan_details, product_plans, "
"rating, sample, sku, series, reviews, ws4v, origin, "
"relationships, review_attrs, categories, badge_types, "
"category_ladders, claim_code_url, is_downloaded, "
"is_finished, is_returnable, origin_asin, pdf_url, "
"percent_complete, provided_review"
),
bunch_size=bunch_size,
start_date=start_date,
end_date=end_date
)
if resolve_podcasts:
await library.resolve_podcasts(start_date=start_date, end_date=end_date)
bunch_size = params.get("bunch_size")
async with audible.AsyncClient(auth, timeout=timeout) as client:
library = await Library.from_api_full_sync(
client,
response_groups=(
"contributors, media, price, product_attrs, product_desc, "
"product_extended_attrs, product_plan_details, product_plans, "
"rating, sample, sku, series, reviews, ws4v, origin, "
"relationships, review_attrs, categories, badge_types, "
"category_ladders, claim_code_url, is_downloaded, "
"is_finished, is_returnable, origin_asin, pdf_url, "
"percent_complete, provided_review"
),
bunch_size=bunch_size
)
return library
@cli.command("export")
@click.option(
"--output", "-o",
type=click.Path(path_type=pathlib.Path),
default=pathlib.Path().cwd() / r"library.{format}",
show_default=True,
help="output file"
)
@timeout_option
@click.option(
"--format", "-f",
type=click.Choice(["tsv", "csv", "json"]),
default="tsv",
show_default=True,
help="Output format"
)
@bunch_size_option
@click.option(
"--resolve-podcasts",
is_flag=True,
help="Resolve podcasts to show all episodes"
)
@start_date_option
@end_date_option
@pass_session
@pass_client
async def export_library(session, client, **params):
"""export library"""
async def _list_library(auth, **params):
library = await _get_library(auth, **params)
@wrap_async
def _prepare_item(item):
books = []
for item in library:
asin = item.asin
authors = ", ".join(
sorted(a["name"] for a in item.authors) if item.authors else ""
)
series = ", ".join(
sorted(s["title"] for s in item.series) if item.series else ""
)
title = item.title
books.append((asin, authors, series, title))
for asin, authors, series, title in sorted(books):
fields = [asin]
if authors:
fields.append(authors)
if series:
fields.append(series)
fields.append(title)
echo(": ".join(fields))
def _prepare_library_for_export(library: Library):
keys_with_raw_values = (
"asin", "title", "subtitle", "runtime_length_min", "is_finished",
"percent_complete", "release_date"
)
prepared_library = []
for item in library:
data_row = {}
for key in item:
v = getattr(item, key)
@ -110,85 +105,128 @@ async def export_library(session, client, **params):
genres.append(ladder["name"])
data_row["genres"] = ", ".join(genres)
return data_row
prepared_library.append(data_row)
prepared_library.sort(key=lambda x: x["asin"])
return prepared_library
def _export_to_csv(
file: pathlib.Path,
data: list,
headers: Union[list, tuple],
dialect: str
):
with file.open("w", encoding="utf-8", newline="") as f:
writer = csv.DictWriter(f, fieldnames=headers, dialect=dialect)
writer.writeheader()
for i in data:
writer.writerow(i)
async def _export_library(auth, **params):
output_format = params.get("format")
output_filename: pathlib.Path = params.get("output")
if output_filename.suffix == r".{format}":
suffix = "." + output_format
output_filename = output_filename.with_suffix(suffix)
resolve_podcasts = params.get("resolve_podcasts")
library = await _get_library(session, client, resolve_podcasts)
library = await _get_library(auth, **params)
keys_with_raw_values = (
"asin", "title", "subtitle", "extended_product_description", "runtime_length_min", "is_finished",
"percent_complete", "release_date", "purchase_date"
)
prepared_library = _prepare_library_for_export(library)
prepared_library = await asyncio.gather(
*[_prepare_item(i) for i in library]
headers = (
"asin", "title", "subtitle", "authors", "narrators", "series_title",
"series_sequence", "genres", "runtime_length_min", "is_finished",
"percent_complete", "rating", "num_ratings", "date_added",
"release_date", "cover_url"
)
prepared_library = [i for i in prepared_library if i is not None]
prepared_library.sort(key=lambda x: x["asin"])
if output_format in ("tsv", "csv"):
if output_format == "csv":
if output_format == csv:
dialect = "excel"
else:
dialect = "excel-tab"
_export_to_csv(output_filename, prepared_library, headers, dialect)
headers = (
"asin", "title", "subtitle", "extended_product_description", "authors", "narrators", "series_title",
"series_sequence", "genres", "runtime_length_min", "is_finished",
"percent_complete", "rating", "num_ratings", "date_added",
"release_date", "cover_url", "purchase_date"
)
export_to_csv(output_filename, prepared_library, headers, dialect)
elif output_format == "json":
if output_format == "json":
data = json.dumps(prepared_library, indent=4)
output_filename.write_text(data)
@cli.command("list")
@timeout_option
@bunch_size_option
@cli.command("export")
@click.option(
"--resolve-podcasts",
is_flag=True,
help="Resolve podcasts to show all episodes"
"--output", "-o",
type=click.Path(path_type=pathlib.Path),
default=pathlib.Path().cwd() / r"library.{format}",
show_default=True,
help="output file"
)
@start_date_option
@end_date_option
@pass_session
@pass_client
async def list_library(session, client, resolve_podcasts):
"""list titles in library"""
@wrap_async
def _prepare_item(item):
fields = [item.asin]
authors = ", ".join(
sorted(a["name"] for a in item.authors) if item.authors else ""
)
if authors:
fields.append(authors)
series = ", ".join(
sorted(s["title"] for s in item.series) if item.series else ""
)
if series:
fields.append(series)
fields.append(item.title)
return ": ".join(fields)
library = await _get_library(session, client, resolve_podcasts)
books = await asyncio.gather(
*[_prepare_item(i) for i in library]
@click.option(
"--timeout", "-t",
type=click.INT,
default=10,
show_default=True,
help=(
"Increase the timeout time if you got any TimeoutErrors. "
"Set to 0 to disable timeout."
)
[echo(i) for i in sorted(books) if len(i) > 0]
)
@click.option(
"--format", "-f",
type=click.Choice(["tsv", "csv", "json"]),
default="tsv",
show_default=True,
help="Output format"
)
@click.option(
"--bunch-size",
type=click.IntRange(10, 1000),
default=1000,
show_default=True,
help="How many library items should be requested per request. A lower "
"size results in more requests to get the full library. A higher "
"size can result in a TimeOutError on low internet connections."
)
@pass_session
def export_library(session, **params):
"""export library"""
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(_export_library(session.auth, **params))
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
@cli.command("list")
@click.option(
"--timeout", "-t",
type=click.INT,
default=10,
show_default=True,
help=(
"Increase the timeout time if you got any TimeoutErrors. "
"Set to 0 to disable timeout."
)
)
@click.option(
"--bunch-size",
type=click.IntRange(10, 1000),
default=1000,
show_default=True,
help="How many library items should be requested per request. A lower "
"size results in more requests to get the full library. A higher "
"size can result in a TimeOutError on low internet connections."
)
@pass_session
def list_library(session, **params):
"""list titles in library"""
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(_list_library(session.auth, **params))
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()

View file

@ -6,8 +6,7 @@ from audible import Authenticator
from click import echo, secho
from tabulate import tabulate
from ..constants import AVAILABLE_MARKETPLACES
from ..decorators import pass_session
from ..config import pass_session
from ..utils import build_auth_file
@ -46,13 +45,13 @@ def config_editor(session):
def list_profiles(session):
"""List all profiles in the config file"""
head = ["P", "Profile", "auth file", "cc"]
config = session.config
profiles = config.data.get("profile")
profiles = session.config.data.get("profile")
data = []
for profile in profiles:
auth_file = config.get_profile_option(profile, "auth_file")
country_code = config.get_profile_option(profile, "country_code")
p = profiles.get(profile)
auth_file = p.get("auth_file")
country_code = p.get("country_code")
is_primary = profile == session.config.primary_profile
data.append(
["*" if is_primary else "", profile, auth_file, country_code])
@ -73,7 +72,8 @@ def list_profiles(session):
@click.option(
"--country-code", "-cc",
prompt="Please enter the country code",
type=click.Choice(AVAILABLE_MARKETPLACES),
type=click.Choice([
"us", "ca", "uk", "au", "fr", "de", "jp", "it", "in"]),
help="The country code for the profile."
)
@click.option(
@ -92,7 +92,7 @@ def list_profiles(session):
def add_profile(ctx, session, profile, country_code, auth_file, is_primary):
"""Adds a profile to config file"""
if not (session.config.dirname / auth_file).exists():
logger.error("Auth file doesn't exists")
logger.error("Auth file doesn't exists.")
raise click.Abort()
session.config.add_profile(
@ -160,14 +160,14 @@ def check_if_auth_file_not_exists(session, ctx, param, value):
)
@click.option(
"--country-code", "-cc",
type=click.Choice(AVAILABLE_MARKETPLACES),
type=click.Choice(["us", "ca", "uk", "au", "fr", "de", "jp", "it", "in"]),
prompt="Please enter the country code",
help="The country code for the marketplace you want to authenticate."
)
@click.option(
"--external-login",
is_flag=True,
help="Authenticate using a web browser."
help="Authenticate using a webbrowser."
)
@click.option(
"--with-username",

View file

@ -1,19 +1,13 @@
import logging
import pathlib
import sys
import audible
import click
from click import echo, secho, prompt
from tabulate import tabulate
from .. import __version__
from ..config import ConfigFile
from ..constants import (
AVAILABLE_MARKETPLACES,
CONFIG_FILE,
DEFAULT_AUTH_FILE_EXTENSION
)
from ..decorators import pass_session
from ..config import Config, pass_session
from ..constants import CONFIG_FILE, DEFAULT_AUTH_FILE_EXTENSION
from ..utils import build_auth_file
@ -37,10 +31,10 @@ def tabulate_summary(d: dict) -> str:
return tabulate(data, head, tablefmt="pretty", colalign=("left", "left"))
def ask_user(config: ConfigFile):
def ask_user(config: Config):
d = {}
welcome_message = (
f"\nWelcome to the audible-cli {__version__} quickstart utility.")
f"Welcome to the audible {audible.__version__} quickstart utility.")
secho(welcome_message, bold=True)
secho(len(welcome_message) * "=", bold=True)
@ -56,11 +50,11 @@ config dir. If the auth file doesn't exists, it will be created. In this case,
an authentication to the audible server is necessary to register a new device.
"""
echo()
secho(intro)
secho(intro, bold=True)
path = config.dirname.absolute()
secho("Selected dir to proceed with:", bold=True)
echo(path)
echo(path.absolute())
echo()
echo("Please enter values for the following settings (just press Enter "
@ -71,11 +65,13 @@ an authentication to the audible server is necessary to register a new device.
"Please enter a name for your primary profile",
default="audible")
available_country_codes = [
"us", "ca", "uk", "au", "fr", "de", "es", "jp", "it", "in"]
echo()
d["country_code"] = prompt(
"Enter a country code for the profile",
show_choices=False,
type=click.Choice(AVAILABLE_MARKETPLACES)
type=click.Choice(available_country_codes)
)
echo()
@ -141,14 +137,17 @@ an authentication to the audible server is necessary to register a new device.
@click.command("quickstart")
@click.pass_context
@pass_session
def cli(session):
"""Quick setup audible"""
config_file: pathlib.Path = session.app_dir / CONFIG_FILE
config = ConfigFile(config_file, file_exists=False)
if config_file.is_file():
m = f"Config file {config_file} already exists. Quickstart will " \
def cli(session, ctx):
"""Quicksetup audible"""
session._config = Config()
config = session.config
config._config_file = session.app_dir / CONFIG_FILE
if config.file_exists():
m = f"Config file {config.filename} already exists. Quickstart will " \
f"not overwrite existing files."
logger.error(m)
raise click.Abort()
@ -158,9 +157,16 @@ def cli(session):
echo(tabulate_summary(d))
click.confirm("Do you want to continue?", abort=True)
config.add_profile(
name=d.get("profile_name"),
auth_file=d.get("auth_file"),
country_code=d.get("country_code"),
is_primary=True,
write_config=False)
if "use_existing_auth_file" not in d:
build_auth_file(
filename=session.app_dir / d.get("auth_file"),
filename=config.dirname / d.get("auth_file"),
username=d.get("audible_username"),
password=d.get("audible_password"),
country_code=d.get("country_code"),
@ -169,9 +175,4 @@ def cli(session):
with_username=d.get("with_username")
)
config.add_profile(
name=d.get("profile_name"),
auth_file=d.get("auth_file"),
country_code=d.get("country_code"),
is_primary=True,
)
config.write_config()

View file

@ -1,65 +1,70 @@
import asyncio
import csv
import json
import logging
import pathlib
from typing import Union
import audible
import click
import httpx
import questionary
from click import echo
from ..decorators import timeout_option, pass_client, wrap_async
from ..models import Catalog, Wishlist
from ..utils import export_to_csv
from ..config import pass_session
from ..models import Wishlist
logger = logging.getLogger("audible_cli.cmds.cmd_wishlist")
async def _get_wishlist(auth, **params):
timeout = params.get("timeout")
if timeout == 0:
timeout = None
# audible api raises a 500 status error when to many requests
# where made to wishlist endpoint in short time
limits = httpx.Limits(max_keepalive_connections=1, max_connections=1)
async def _get_wishlist(client):
wishlist = await Wishlist.from_api(
client,
response_groups=(
"contributors, media, price, product_attrs, product_desc, "
"product_extended_attrs, product_plan_details, product_plans, "
"rating, sample, sku, series, reviews, review_attrs, ws4v, "
"customer_rights, categories, category_ladders, claim_code_url"
async with audible.AsyncClient(auth, timeout=timeout) as client:
wishlist = await Wishlist.from_api(
client,
response_groups=(
"contributors, media, price, product_attrs, product_desc, "
"product_extended_attrs, product_plan_details, product_plans, "
"rating, sample, sku, series, reviews, review_attrs, ws4v, "
"customer_rights, categories, category_ladders, claim_code_url"
)
)
)
return wishlist
@click.group("wishlist")
def cli():
"""interact with wishlist"""
async def _list_wishlist(auth, **params):
wishlist = await _get_wishlist(auth, **params)
books = []
for item in wishlist:
asin = item.asin
authors = ", ".join(
sorted(a["name"] for a in item.authors) if item.authors else ""
)
series = ", ".join(
sorted(s["title"] for s in item.series) if item.series else ""
)
title = item.title
books.append((asin, authors, series, title))
for asin, authors, series, title in sorted(books):
fields = [asin]
if authors:
fields.append(authors)
if series:
fields.append(series)
fields.append(title)
echo(": ".join(fields))
@cli.command("export")
@click.option(
"--output", "-o",
type=click.Path(),
default=pathlib.Path().cwd() / r"wishlist.{format}",
show_default=True,
help="output file"
)
@timeout_option
@click.option(
"--format", "-f",
type=click.Choice(["tsv", "csv", "json"]),
default="tsv",
show_default=True,
help="Output format"
)
@pass_client
async def export_wishlist(client, **params):
"""export wishlist"""
def _prepare_wishlist_for_export(wishlist: dict):
keys_with_raw_values = (
"asin", "title", "subtitle", "runtime_length_min", "is_finished",
"percent_complete", "release_date"
)
@wrap_async
def _prepare_item(item):
prepared_wishlist = []
for item in wishlist:
data_row = {}
for key in item:
v = getattr(item, key)
@ -88,234 +93,116 @@ async def export_wishlist(client, **params):
for ladder in genre["ladder"]:
genres.append(ladder["name"])
data_row["genres"] = ", ".join(genres)
return data_row
prepared_wishlist.append(data_row)
prepared_wishlist.sort(key=lambda x: x["asin"])
return prepared_wishlist
def _export_to_csv(
file: pathlib.Path,
data: list,
headers: Union[list, tuple],
dialect: str
):
with file.open("w", encoding="utf-8", newline="") as f:
writer = csv.DictWriter(f, fieldnames=headers, dialect=dialect)
writer.writeheader()
for i in data:
writer.writerow(i)
async def _export_wishlist(auth, **params):
output_format = params.get("format")
output_filename: pathlib.Path = params.get("output")
if output_filename.suffix == r".{format}":
suffix = "." + output_format
output_filename = output_filename.with_suffix(suffix)
wishlist = await _get_wishlist(client)
wishlist = await _get_wishlist(auth, **params)
keys_with_raw_values = (
"asin", "title", "subtitle", "runtime_length_min", "is_finished",
"percent_complete", "release_date"
)
prepared_wishlist = _prepare_wishlist_for_export(wishlist)
prepared_wishlist = await asyncio.gather(
*[_prepare_item(i) for i in wishlist]
headers = (
"asin", "title", "subtitle", "authors", "narrators", "series_title",
"series_sequence", "genres", "runtime_length_min", "is_finished",
"percent_complete", "rating", "num_ratings", "date_added",
"release_date", "cover_url"
)
prepared_wishlist.sort(key=lambda x: x["asin"])
if output_format in ("tsv", "csv"):
if output_format == "csv":
if output_format == csv:
dialect = "excel"
else:
dialect = "excel-tab"
_export_to_csv(output_filename, prepared_wishlist, headers, dialect)
headers = (
"asin", "title", "subtitle", "authors", "narrators", "series_title",
"series_sequence", "genres", "runtime_length_min", "is_finished",
"percent_complete", "rating", "num_ratings", "date_added",
"release_date", "cover_url"
)
export_to_csv(
output_filename, prepared_wishlist, headers, dialect
)
elif output_format == "json":
if output_format == "json":
data = json.dumps(prepared_wishlist, indent=4)
output_filename.write_text(data)
@cli.command("list")
@timeout_option
@pass_client
async def list_wishlist(client):
"""list titles in wishlist"""
@click.group("wishlist")
def cli():
"""interact with wishlist"""
@wrap_async
def _prepare_item(item):
fields = [item.asin]
authors = ", ".join(
sorted(a["name"] for a in item.authors) if item.authors else ""
)
if authors:
fields.append(authors)
series = ", ".join(
sorted(s["title"] for s in item.series) if item.series else ""
)
if series:
fields.append(series)
fields.append(item.title)
return ": ".join(fields)
wishlist = await _get_wishlist(client)
books = await asyncio.gather(
*[_prepare_item(i) for i in wishlist]
@cli.command("export")
@click.option(
"--output", "-o",
type=click.Path(),
default=pathlib.Path().cwd() / r"wishlist.{format}",
show_default=True,
help="output file"
)
@click.option(
"--timeout", "-t",
type=click.INT,
default=10,
show_default=True,
help=(
"Increase the timeout time if you got any TimeoutErrors. "
"Set to 0 to disable timeout."
)
for i in sorted(books):
echo(i)
@cli.command("add")
@click.option(
"--asin", "-a",
multiple=True,
help="asin of the audiobook"
)
@click.option(
"--title", "-t",
multiple=True,
help="tile of the audiobook (partial search)"
"--format", "-f",
type=click.Choice(["tsv", "csv", "json"]),
default="tsv",
show_default=True,
help="Output format"
)
@timeout_option
@pass_client(limits=limits)
async def add_wishlist(client, asin, title):
"""add asin(s) to wishlist
Run the command without any option for interactive mode.
"""
async def add_asin(asin):
body = {"asin": asin}
r = await client.post("wishlist", body=body)
return r
asin = list(asin)
title = list(title)
if not asin and not title:
q = await questionary.select(
"Do you want to add an item by asin or title?",
choices=[
questionary.Choice(title="by title", value="title"),
questionary.Choice(title="by asin", value="asin")
]
).unsafe_ask_async()
if q == 'asin':
q = await questionary.text("Please enter the asin").unsafe_ask_async()
asin.append(q)
else:
q = await questionary.text("Please enter the title").unsafe_ask_async()
title.append(q)
for t in title:
catalog = await Catalog.from_api(
client,
title=t,
num_results=50
)
match = catalog.search_item_by_title(t)
full_match = [i for i in match if i[1] == 100]
if match:
choices = []
for i in full_match or match:
c = questionary.Choice(title=i[0].full_title, value=i[0].asin)
choices.append(c)
answer = await questionary.checkbox(
f"Found the following matches for '{t}'. Which you want to add?",
choices=choices
).unsafe_ask_async()
if answer is not None:
[asin.append(i) for i in answer]
else:
logger.error(
f"Skip title {t}: Not found in library"
)
jobs = [add_asin(a) for a in asin]
await asyncio.gather(*jobs)
wishlist = await _get_wishlist(client)
for a in asin:
if wishlist.has_asin(a):
item = wishlist.get_item_by_asin(a)
logger.info(f"{a} ({item.full_title}) added to wishlist")
else:
logger.error(f"{a} was not added to wishlist")
@pass_session
def export_library(session, **params):
"""export wishlist"""
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(_export_wishlist(session.auth, **params))
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
@cli.command("remove")
@cli.command("list")
@click.option(
"--asin", "-a",
multiple=True,
help="asin of the audiobook"
"--timeout", "-t",
type=click.INT,
default=10,
show_default=True,
help=(
"Increase the timeout time if you got any TimeoutErrors. "
"Set to 0 to disable timeout."
)
)
@click.option(
"--title", "-t",
multiple=True,
help="tile of the audiobook (partial search)"
)
@timeout_option
@pass_client(limits=limits)
async def remove_wishlist(client, asin, title):
"""remove asin(s) from wishlist
Run the command without any option for interactive mode.
"""
async def remove_asin(rasin):
r = await client.delete(f"wishlist/{rasin}")
item = wishlist.get_item_by_asin(rasin)
logger.info(f"{rasin} ({item.full_title}) removed from wishlist")
return r
asin = list(asin)
wishlist = await _get_wishlist(client)
if not asin and not title:
# interactive mode
choices = []
for i in wishlist:
c = questionary.Choice(title=i.full_title, value=i.asin)
choices.append(c)
asin = await questionary.checkbox(
"Select item(s) which you want to remove from whishlist",
choices=choices
).unsafe_ask_async()
for t in title:
match = wishlist.search_item_by_title(t)
full_match = [i for i in match if i[1] == 100]
if match:
choices = []
for i in full_match or match:
c = questionary.Choice(title=i[0].full_title, value=i[0].asin)
choices.append(c)
answer = await questionary.checkbox(
f"Found the following matches for '{t}'. Which you want to remove?",
choices=choices
).unsafe_ask_async()
if answer is not None:
[asin.append(i) for i in answer]
else:
logger.error(
f"Skip title {t}: Not found in library"
)
if asin:
jobs = []
for a in asin:
if wishlist.has_asin(a):
jobs.append(remove_asin(a))
else:
logger.error(f"{a} not in wishlist")
await asyncio.gather(*jobs)
@pass_session
def list_library(session, **params):
"""list titles in wishlist"""
loop = asyncio.get_event_loop()
try:
loop.run_until_complete(_list_wishlist(session.auth, **params))
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()

View file

@ -3,10 +3,9 @@ import os
import pathlib
from typing import Any, Dict, Optional, Union
import audible
import click
import toml
from audible import AsyncClient, Authenticator
from audible import Authenticator
from audible.exceptions import FileEncryptionError
from . import __version__
@ -23,116 +22,51 @@ from .exceptions import AudibleCliException, ProfileAlreadyExists
logger = logging.getLogger("audible_cli.config")
class ConfigFile:
"""Presents an audible-cli configuration file
class Config:
"""Holds the config file data and environment."""
Instantiate a :class:`~audible_cli.config.ConfigFile` will load the file
content by default. To create a new config file, the ``file_exists``
argument must be set to ``False``.
Audible-cli configuration files are written in the toml markup language.
It has a main section named `APP` and sections for each profile named
`profile.<profile_name>`.
Args:
filename: The file path to the config file
file_exists: If ``True``, the file must exist and the file content
is loaded.
"""
def __init__(
self,
filename: Union[str, pathlib.Path],
file_exists: bool = True
) -> None:
filename = pathlib.Path(filename).resolve()
config_data = DEFAULT_CONFIG_DATA.copy()
file_data = {}
if file_exists:
if not filename.is_file():
raise AudibleCliException(
f"Config file {click.format_filename(filename)} "
f"does not exists"
)
file_data = toml.load(filename)
logger.debug(
f"Config loaded from "
f"{click.format_filename(filename, shorten=True)}"
)
config_data.update(file_data)
self._config_file = filename
self._config_data = config_data
def __init__(self) -> None:
self._config_file: Optional[pathlib.Path] = None
self._config_data: Dict[str, Union[str, Dict]] = DEFAULT_CONFIG_DATA
self._current_profile: Optional[str] = None
self._is_read: bool = False
@property
def filename(self) -> pathlib.Path:
"""Returns the path to the config file"""
def filename(self) -> Optional[pathlib.Path]:
return self._config_file
def file_exists(self) -> bool:
return self.filename.exists()
@property
def dirname(self) -> pathlib.Path:
"""Returns the path to the config file directory"""
return self.filename.parent
def dir_exists(self) -> bool:
return self.dirname.exists()
@property
def is_read(self) -> bool:
return self._is_read
@property
def data(self) -> Dict[str, Union[str, Dict]]:
"""Returns the configuration data"""
return self._config_data
@property
def app_config(self) -> Dict[str, str]:
"""Returns the configuration data for the APP section"""
return self.data["APP"]
def has_profile(self, name: str) -> bool:
"""Check if a profile with this name are in the configuration data
Args:
name: The name of the profile
"""
return name in self.data["profile"]
def get_profile(self, name: str) -> Dict[str, str]:
"""Returns the configuration data for these profile name
Args:
name: The name of the profile
"""
if not self.has_profile(name):
raise AudibleCliException(f"Profile {name} does not exists")
return self.data["profile"][name]
return self.data.get("APP", {})
@property
def primary_profile(self) -> str:
if "primary_profile" not in self.app_config:
raise AudibleCliException("No primary profile set in config")
return self.app_config["primary_profile"]
def profile_config(self) -> Dict[str, str]:
return self.data["profile"][self._current_profile]
def get_profile_option(
self,
profile: str,
option: str,
default: Optional[str] = None
) -> str:
"""Returns the value for an option for the given profile.
@property
def primary_profile(self) -> Optional[str]:
return self.app_config.get("primary_profile")
Looks first, if an option is in the ``profile`` section. If not, it
searches for the option in the ``APP`` section. If not found, it
returns the ``default``.
Args:
profile: The name of the profile
option: The name of the option to search for
default: The default value to return, if the option is not found
"""
profile = self.get_profile(profile)
if option in profile:
return profile[option]
if option in self.app_config:
return self.app_config[option]
return default
def has_profile(self, name: str) -> bool:
return name in self.data.get("profile", {})
def add_profile(
self,
@ -140,22 +74,12 @@ class ConfigFile:
auth_file: Union[str, pathlib.Path],
country_code: str,
is_primary: bool = False,
abort_on_existing_profile: bool = True,
write_config: bool = True,
**additional_options
) -> None:
"""Adds a new profile to the config
Args:
name: The name of the profile
auth_file: The name of the auth_file
country_code: The country code of the marketplace to use with
this profile
is_primary: If ``True``, this profile is set as primary in the
``APP`` section
write_config: If ``True``, save the config to file
"""
if self.has_profile(name):
if self.has_profile(name) and abort_on_existing_profile:
raise ProfileAlreadyExists(name)
profile_data = {
@ -168,41 +92,31 @@ class ConfigFile:
if is_primary:
self.data["APP"]["primary_profile"] = name
logger.info(f"Profile {name} added to config")
if write_config:
self.write_config()
def delete_profile(self, name: str, write_config: bool = True) -> None:
"""Deletes a profile from config
Args:
name: The name of the profile
write_config: If ``True``, save the config to file
Note:
Does not delete the auth file.
"""
if not self.has_profile(name):
raise AudibleCliException(f"Profile {name} does not exists")
def delete_profile(self, name: str) -> None:
del self.data["profile"][name]
logger.info(f"Profile {name} removed from config")
def read_config(
self,
filename: Optional[Union[str, pathlib.Path]] = None
) -> None:
f = pathlib.Path(filename or self.filename).resolve()
if write_config:
self.write_config()
try:
self.data.update(toml.load(f))
except FileNotFoundError:
message = f"Config file {click.format_filename(f)} not found"
raise AudibleCliException(message)
self._config_file = f
self._is_read = True
def write_config(
self,
filename: Optional[Union[str, pathlib.Path]] = None
) -> None:
"""Write the config data to file
Args:
filename: If not ``None`` the config is written to these file path
instead of ``self.filename``
"""
f = pathlib.Path(filename or self.filename).resolve()
if not f.parent.is_dir():
@ -210,99 +124,78 @@ class ConfigFile:
toml.dump(self.data, f.open("w"))
click_f = click.format_filename(f, shorten=True)
logger.info(f"Config written to {click_f}")
class Session:
"""Holds the settings for the current session"""
"""Holds the settings for the current session."""
def __init__(self) -> None:
self._auths: Dict[str, Authenticator] = {}
self._config: Optional[CONFIG_FILE] = None
self._auth: Optional[Authenticator] = None
self._config: Optional[Config] = None
self._params: Dict[str, Any] = {}
self._app_dir: pathlib.Path = get_app_dir()
self._plugin_dir: pathlib.Path = get_plugin_dir()
self._app_dir = get_app_dir()
self._plugin_dir = get_plugin_dir()
logger.debug(f"Audible-cli version: {__version__}")
logger.debug(f"App dir: {click.format_filename(self.app_dir)}")
logger.debug(f"Plugin dir: {click.format_filename(self.plugin_dir)}")
@property
def params(self):
"""Returns the parameter of the session
Parameter are usually added using the ``add_param_to_session``
callback on a click option. This way an option from a parent command
can be accessed from his subcommands.
"""
return self._params
@property
def app_dir(self):
"""Returns the path of the app dir"""
return self._app_dir
@property
def plugin_dir(self):
"""Returns the path of the plugin dir"""
return self._plugin_dir
@property
def config(self):
"""Returns the ConfigFile for this session"""
if self._config is None:
conf_file = self.app_dir / CONFIG_FILE
self._config = ConfigFile(conf_file)
self._config = Config()
logger.debug(
f"Load config from file: "
f"{click.format_filename(conf_file, shorten=True)}"
)
self._config.read_config(conf_file)
name = self.params.get("profile") or self.config.primary_profile
logger.debug(f"Selected profile: {name}")
if name is None:
message = (
"No profile provided and primary profile not set "
"properly in config."
)
try:
ctx = click.get_current_context()
ctx.fail(message)
except RuntimeError:
raise KeyError(message)
if not self.config.has_profile(name):
message = "Provided profile not found in config."
try:
ctx = click.get_current_context()
ctx.fail(message)
except RuntimeError:
raise UserWarning(message)
self.config._current_profile = name
return self._config
@property
def selected_profile(self):
"""Returns the selected config profile name for this session
The `profile` to use must be set using the ``add_param_to_session``
callback of a click option. Otherwise, the primary profile from the
config is used.
"""
profile = self.params.get("profile") or self.config.primary_profile
if profile is None:
message = (
"No profile provided and primary profile not set "
"properly in config."
)
raise AudibleCliException(message)
return profile
def get_auth_for_profile(
self,
profile: str,
password: Optional[str] = None
) -> audible.Authenticator:
"""Returns an Authenticator for a profile
If an Authenticator for this profile is already loaded, it will
return the Authenticator without reloading it. This way a session can
hold multiple Authenticators for different profiles. Commands can use
this to make API requests for more than one profile.
Args:
profile: The name of the profile
password: The password of the auth file
"""
if profile in self._auths:
return self._auths[profile]
if not self.config.has_profile(profile):
message = "Provided profile not found in config."
raise AudibleCliException(message)
auth_file = self.config.get_profile_option(profile, "auth_file")
country_code = self.config.get_profile_option(profile, "country_code")
def _set_auth(self):
profile = self.config.profile_config
auth_file = self.config.dirname / profile["auth_file"]
country_code = profile["country_code"]
password = self.params.get("password")
while True:
try:
auth = Authenticator.from_file(
filename=self.config.dirname / auth_file,
self._auth = Authenticator.from_file(
filename=auth_file,
password=password,
locale=country_code)
break
@ -311,39 +204,20 @@ class Session:
"Auth file is encrypted but no/wrong password is provided"
)
password = click.prompt(
"Please enter the auth-file password (or enter to exit)",
"Please enter the password (or enter to exit)",
hide_input=True,
default="")
if len(password) == 0:
raise click.Abort()
click_f = click.format_filename(auth_file, shorten=True)
logger.debug(f"Auth file {click_f} for profile {profile} loaded.")
self._auths[profile] = auth
return auth
@property
def auth(self):
"""Returns the Authenticator for the selected profile"""
profile = self.selected_profile
password = self.params.get("password")
return self.get_auth_for_profile(profile, password)
if self._auth is None:
self._set_auth()
return self._auth
def get_client_for_profile(
self,
profile: str,
password: Optional[str] = None,
**kwargs
) -> AsyncClient:
auth = self.get_auth_for_profile(profile, password)
kwargs.setdefault("timeout", self.params.get("timeout", 5))
return AsyncClient(auth=auth, **kwargs)
def get_client(self, **kwargs) -> AsyncClient:
profile = self.selected_profile
password = self.params.get("password")
return self.get_client_for_profile(profile, password, **kwargs)
pass_session = click.make_pass_decorator(Session, ensure=True)
def get_app_dir() -> pathlib.Path:
@ -356,3 +230,10 @@ def get_app_dir() -> pathlib.Path:
def get_plugin_dir() -> pathlib.Path:
plugin_dir = os.getenv(PLUGIN_DIR_ENV) or (get_app_dir() / PLUGIN_PATH)
return pathlib.Path(plugin_dir).resolve()
def add_param_to_session(ctx: click.Context, param, value):
"""Add a parameter to :class:`Session` `param` attribute"""
session = ctx.ensure_object(Session)
session.params[param.name] = value
return value

View file

@ -1,8 +1,3 @@
from typing import Dict
from audible.localization import LOCALE_TEMPLATES
APP_NAME: str = "Audible"
CONFIG_FILE: str = "config.toml"
CONFIG_DIR_ENV: str = "AUDIBLE_CONFIG_DIR"
@ -11,14 +6,10 @@ PLUGIN_DIR_ENV: str = "AUDIBLE_PLUGIN_DIR"
PLUGIN_ENTRY_POINT: str = "audible.cli_plugins"
DEFAULT_AUTH_FILE_EXTENSION: str = "json"
DEFAULT_AUTH_FILE_ENCRYPTION: str = "json"
DEFAULT_CONFIG_DATA: Dict[str, str] = {
DEFAULT_CONFIG_DATA = {
"title": "Audible Config File",
"APP": {},
"profile": {}
}
CODEC_HIGH_QUALITY: str = "AAX_44_128"
CODEC_NORMAL_QUALITY: str = "AAX_44_64"
AVAILABLE_MARKETPLACES = [
market["country_code"] for market in LOCALE_TEMPLATES.values()
]
CODEC_HIGH_QUALITY = "AAX_44_128"
CODEC_NORMAL_QUALITY = "AAX_44_64"

View file

@ -1,273 +0,0 @@
import asyncio
import logging
from functools import partial, wraps
import click
import httpx
from packaging.version import parse
from .config import Session
from .utils import datetime_type
from ._logging import _normalize_logger
from . import __version__
logger = logging.getLogger("audible_cli.options")
pass_session = click.make_pass_decorator(Session, ensure=True)
def run_async(f):
@wraps(f)
def wrapper(*args, **kwargs):
if hasattr(asyncio, "run"):
logger.debug("Using asyncio.run ...")
return asyncio.run(f(*args, ** kwargs))
else:
logger.debug("Using asyncio.run_until_complete ...")
loop = asyncio.get_event_loop()
if loop.is_closed():
loop = asyncio.new_event_loop()
try:
return loop.run_until_complete(f(*args, ** kwargs))
finally:
loop.run_until_complete(loop.shutdown_asyncgens())
loop.close()
return wrapper
def wrap_async(f):
"""Wrap a synchronous function and runs them in an executor"""
@wraps(f)
async def wrapper(*args, loop=None, executor=None, **kwargs):
if loop is None:
loop = asyncio.get_event_loop()
partial_func = partial(f, *args, **kwargs)
return await loop.run_in_executor(executor, partial_func)
return wrapper
def pass_client(func=None, **client_kwargs):
def coro(f):
@wraps(f)
@pass_session
@run_async
async def wrapper(session, *args, **kwargs):
client = session.get_client(**client_kwargs)
async with client.session:
return await f(*args, client, **kwargs)
return wrapper
if callable(func):
return coro(func)
return coro
def add_param_to_session(ctx: click.Context, param, value):
"""Add a parameter to :class:`Session` `param` attribute
This is usually used as a callback for a click option
"""
session = ctx.ensure_object(Session)
session.params[param.name] = value
return value
def version_option(func=None, **kwargs):
def callback(ctx, param, value):
if not value or ctx.resilient_parsing:
return
message = f"audible-cli, version {__version__}"
click.echo(message, color=ctx.color, nl=False)
url = "https://api.github.com/repos/mkb79/audible-cli/releases/latest"
headers = {"Accept": "application/vnd.github.v3+json"}
logger.debug(f"Requesting Github API for latest release information")
try:
response = httpx.get(url, headers=headers, follow_redirects=True)
response.raise_for_status()
except Exception as e:
logger.error(e)
raise click.Abort()
content = response.json()
current_version = parse(__version__)
latest_version = parse(content["tag_name"])
html_url = content["html_url"]
if latest_version > current_version:
click.echo(
f" (update available)\nVisit {html_url} "
f"for information about the new release.",
color=ctx.color
)
else:
click.echo(" (up-to-date)", color=ctx.color)
ctx.exit()
kwargs.setdefault("is_flag", True)
kwargs.setdefault("expose_value", False)
kwargs.setdefault("is_eager", True)
kwargs.setdefault("help", "Show the version and exit.")
kwargs["callback"] = callback
option = click.option("--version", **kwargs)
if callable(func):
return option(func)
return option
def profile_option(func=None, **kwargs):
kwargs.setdefault("callback", add_param_to_session)
kwargs.setdefault("expose_value", False)
kwargs.setdefault(
"help",
"The profile to use instead primary profile (case sensitive!)."
)
option = click.option("--profile", "-P", **kwargs)
if callable(func):
return option(func)
return option
def password_option(func=None, **kwargs):
kwargs.setdefault("callback", add_param_to_session)
kwargs.setdefault("expose_value", False)
kwargs.setdefault("help", "The password for the profile auth file.")
option = click.option("--password", "-p", **kwargs)
if callable(func):
return option(func)
return option
def verbosity_option(func=None, *, cli_logger=None, **kwargs):
"""A decorator that adds a `--verbosity, -v` option to the decorated
command.
Keyword arguments are passed to
the underlying ``click.option`` decorator.
"""
def callback(ctx, param, value):
x = getattr(logging, value.upper(), None)
if x is None:
raise click.BadParameter(
f"Must be CRITICAL, ERROR, WARNING, INFO or DEBUG, "
f"not {value}"
)
cli_logger.setLevel(x)
kwargs.setdefault("default", "INFO")
kwargs.setdefault("metavar", "LVL")
kwargs.setdefault("expose_value", False)
kwargs.setdefault(
"help", "Either CRITICAL, ERROR, WARNING, "
"INFO or DEBUG. [default: INFO]"
)
kwargs.setdefault("is_eager", True)
kwargs.setdefault("callback", callback)
cli_logger = _normalize_logger(cli_logger)
option = click.option("--verbosity", "-v", **kwargs)
if callable(func):
return option(func)
return option
def timeout_option(func=None, **kwargs):
def callback(ctx: click.Context, param, value):
if value == 0:
value = None
session = ctx.ensure_object(Session)
session.params[param.name] = value
return value
kwargs.setdefault("type", click.INT)
kwargs.setdefault("default", 30)
kwargs.setdefault("show_default", True)
kwargs.setdefault(
"help", ("Increase the timeout time if you got any TimeoutErrors. "
"Set to 0 to disable timeout.")
)
kwargs.setdefault("callback", callback)
kwargs.setdefault("expose_value", False)
option = click.option("--timeout", **kwargs)
if callable(func):
return option(func)
return option
def bunch_size_option(func=None, **kwargs):
kwargs.setdefault("type", click.IntRange(10, 1000))
kwargs.setdefault("default", 1000)
kwargs.setdefault("show_default", True)
kwargs.setdefault(
"help", ("How many library items should be requested per request. A "
"lower size results in more requests to get the full library. "
"A higher size can result in a TimeOutError on low internet "
"connections.")
)
kwargs.setdefault("callback", add_param_to_session)
kwargs.setdefault("expose_value", False)
option = click.option("--bunch-size", **kwargs)
if callable(func):
return option(func)
return option
def start_date_option(func=None, **kwargs):
kwargs.setdefault("type", datetime_type)
kwargs.setdefault(
"help",
"Only considers books added to library on or after this UTC date."
)
kwargs.setdefault("callback", add_param_to_session)
kwargs.setdefault("expose_value", False)
option = click.option("--start-date", **kwargs)
if callable(func):
return option(func)
return option
def end_date_option(func=None, **kwargs):
kwargs.setdefault("type", datetime_type)
kwargs.setdefault(
"help",
"Only considers books added to library on or before this UTC date."
)
kwargs.setdefault("callback", add_param_to_session)
kwargs.setdefault("expose_value", False)
option = click.option("--end-date", **kwargs)
if callable(func):
return option(func)
return option

View file

@ -1,563 +0,0 @@
import logging
import pathlib
import re
from enum import Enum, auto
from typing import Any, Callable, Dict, List, NamedTuple, Optional, Union
import aiofiles
import click
import httpx
import tqdm
from aiofiles.os import path, unlink
try:
from typing import Literal
except ImportError:
from typing_extensions import Literal
FileMode = Literal["ab", "wb"]
logger = logging.getLogger("audible_cli.downloader")
ACCEPT_RANGES_HEADER = "Accept-Ranges"
ACCEPT_RANGES_NONE_VALUE = "none"
CONTENT_LENGTH_HEADER = "Content-Length"
CONTENT_TYPE_HEADER = "Content-Type"
MAX_FILE_READ_SIZE = 3 * 1024 * 1024
ETAG_HEADER = "ETag"
class ETag:
def __init__(self, etag: str) -> None:
self._etag = etag
@property
def value(self) -> str:
return self._etag
@property
def parsed_etag(self) -> str:
return re.search('"([^"]*)"', self.value).group(1)
@property
def is_weak(self) -> bool:
return bool(re.search("^W/", self.value))
class File:
def __init__(self, file: Union[pathlib.Path, str]) -> None:
if not isinstance(file, pathlib.Path):
file = pathlib.Path(file)
self._file = file
@property
def path(self) -> pathlib.Path:
return self._file
async def get_size(self) -> int:
if await path.isfile(self.path):
return await path.getsize(self.path)
return 0
async def remove(self) -> None:
if await path.isfile(self.path):
await unlink(self.path)
async def directory_exists(self) -> bool:
return await path.isdir(self.path.parent)
async def is_file(self) -> bool:
return await path.isfile(self.path) and not await self.is_link()
async def is_link(self) -> bool:
return await path.islink(self.path)
async def exists(self) -> bool:
return await path.exists(self.path)
async def read_text_content(
self, max_bytes: int = MAX_FILE_READ_SIZE, encoding: str = "utf-8", errors=None
) -> str:
file_size = await self.get_size()
read_size = min(max_bytes, file_size)
try:
async with aiofiles.open(
file=self.path, mode="r", encoding=encoding, errors=errors
) as file:
return await file.read(read_size)
except Exception: # noqa
return "Unknown"
class ResponseInfo:
def __init__(self, response: httpx.Response) -> None:
self._response = response
self.headers: httpx.Headers = response.headers
self.status_code: int = response.status_code
self.content_length: Optional[int] = self._get_content_length(self.headers)
self.content_type: Optional[str] = self._get_content_type(self.headers)
self.accept_ranges: bool = self._does_accept_ranges(self.headers)
self.etag: Optional[ETag] = self._get_etag(self.headers)
@property
def response(self) -> httpx.Response:
return self._response
def supports_resume(self) -> bool:
return bool(self.accept_ranges)
@staticmethod
def _does_accept_ranges(headers: httpx.Headers) -> bool:
# 'Accept-Ranges' indicates if the source accepts range requests,
# that let you retrieve a part of the response
accept_ranges_value = headers.get(
ACCEPT_RANGES_HEADER, ACCEPT_RANGES_NONE_VALUE
)
does_accept_ranges = accept_ranges_value != ACCEPT_RANGES_NONE_VALUE
return does_accept_ranges
@staticmethod
def _get_content_length(headers: httpx.Headers) -> Optional[int]:
content_length = headers.get(CONTENT_LENGTH_HEADER)
if content_length is not None:
return int(content_length)
return content_length
@staticmethod
def _get_content_type(headers: httpx.Headers) -> Optional[str]:
return headers.get(CONTENT_TYPE_HEADER)
@staticmethod
def _get_etag(headers: httpx.Headers) -> Optional[ETag]:
etag_header = headers.get(ETAG_HEADER)
if etag_header is None:
return etag_header
return ETag(etag_header)
class Status(Enum):
Success = auto()
DestinationAlreadyExists = auto()
DestinationFolderNotExists = auto()
DestinationNotAFile = auto()
DownloadError = auto()
DownloadErrorStatusCode = auto()
DownloadSizeMismatch = auto()
DownloadContentTypeMismatch = auto()
DownloadIndividualParts = auto()
SourceDoesNotSupportResume = auto()
StatusCode = auto()
async def check_target_file_status(
target_file: File, force_reload: bool, **kwargs: Any
) -> Status:
if not await target_file.directory_exists():
logger.error(
f"Folder {target_file.path} does not exists! Skip download."
)
return Status.DestinationFolderNotExists
if await target_file.exists() and not await target_file.is_file():
logger.error(
f"Object {target_file.path} exists but is not a file. Skip download."
)
return Status.DestinationNotAFile
if await target_file.is_file() and not force_reload:
logger.info(
f"File {target_file.path} already exists. Skip download."
)
return Status.DestinationAlreadyExists
return Status.Success
async def check_download_size(
tmp_file: File, target_file: File, head_response: ResponseInfo, **kwargs: Any
) -> Status:
tmp_file_size = await tmp_file.get_size()
content_length = head_response.content_length
if tmp_file_size is not None and content_length is not None:
if tmp_file_size != content_length:
logger.error(
f"Error downloading {target_file.path}. File size missmatch. "
f"Expected size: {content_length}; Downloaded: {tmp_file_size}"
)
return Status.DownloadSizeMismatch
return Status.Success
async def check_status_code(
response: ResponseInfo, tmp_file: File, target_file: File, **kwargs: Any
) -> Status:
if not 200 <= response.status_code < 400:
content = await tmp_file.read_text_content()
logger.error(
f"Error downloading {target_file.path}. Message: {content}"
)
return Status.StatusCode
return Status.Success
async def check_content_type(
response: ResponseInfo, target_file: File, tmp_file: File,
expected_types: List[str], **kwargs: Any
) -> Status:
if not expected_types:
return Status.Success
if response.content_type not in expected_types:
content = await tmp_file.read_text_content()
logger.error(
f"Error downloading {target_file.path}. Wrong content type. "
f"Expected type(s): {expected_types}; "
f"Got: {response.content_type}; Message: {content}"
)
return Status.DownloadContentTypeMismatch
return Status.Success
def _status_for_message(message: str) -> Status:
if "please download individual parts" in message:
return Status.DownloadIndividualParts
return Status.Success
async def check_status_for_message(
response: ResponseInfo, tmp_file: File, **kwargs: Any
) -> Status:
if response.content_type and "text" in response.content_type:
length = response.content_length or await tmp_file.get_size()
if length <= MAX_FILE_READ_SIZE:
message = await tmp_file.read_text_content()
return _status_for_message(message)
return Status.Success
class DownloadResult(NamedTuple):
status: Status
destination: File
head_response: Optional[ResponseInfo]
response: Optional[ResponseInfo]
message: Optional[str]
class DummyProgressBar:
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
pass
def update(self, *args, **kwargs):
pass
def get_progressbar(
destination: pathlib.Path, total: Optional[int], start: int = 0
) -> Union[tqdm.tqdm, DummyProgressBar]:
if total is None:
return DummyProgressBar()
description = click.format_filename(destination, shorten=True)
progressbar = tqdm.tqdm(
desc=description,
total=total,
unit="B",
unit_scale=True,
unit_divisor=1024
)
if start > 0:
progressbar.update(start)
return progressbar
class Downloader:
MIN_STREAM_LENGTH = 10*1024*1024 # using stream mode if source is greater than
MIN_RESUME_FILE_LENGTH = 10*1024*1024 # keep resume file if file is greater than
RESUME_SUFFIX = ".resume"
TMP_SUFFIX = ".tmp"
def __init__(
self,
source: httpx.URL,
client: httpx.AsyncClient,
expected_types: Optional[Union[List[str], str]] = None,
additional_headers: Optional[Dict[str, str]] = None
) -> None:
self._source = source
self._client = client
self._expected_types = self._normalize_expected_types(expected_types)
self._additional_headers = self._normalize_headers(additional_headers)
self._head_request: Optional[ResponseInfo] = None
@staticmethod
def _normalize_expected_types(
expected_types: Optional[Union[List[str], str]]
) -> List[str]:
if not isinstance(expected_types, list):
if expected_types is None:
expected_types = []
else:
expected_types = [expected_types]
return expected_types
@staticmethod
def _normalize_headers(headers: Optional[Dict[str, str]]) -> Dict[str, str]:
if headers is None:
return {}
return headers
async def get_head_response(self, force_recreate: bool = False) -> ResponseInfo:
if self._head_request is None or force_recreate:
# switched from HEAD to GET request without loading the body
# HEAD request to cds.audible.de will responded in 1 - 2 minutes
# a GET request to the same URI will take ~4-6 seconds
async with self._client.stream(
"GET", self._source, headers=self._additional_headers,
follow_redirects=True,
) as head_response:
if head_response.request.url != self._source:
self._source = head_response.request.url
self._head_request = ResponseInfo(head_response)
return self._head_request
async def _determine_resume_file(self, target_file: File) -> File:
head_response = await self.get_head_response()
etag = head_response.etag
if etag is None:
resume_name = target_file.path
else:
parsed_etag = etag.parsed_etag
resume_name = target_file.path.with_name(parsed_etag)
resume_file = resume_name.with_suffix(self.RESUME_SUFFIX)
return File(resume_file)
def _determine_tmp_file(self, target_file: File) -> File:
tmp_file = pathlib.Path(target_file.path).with_suffix(self.TMP_SUFFIX)
return File(tmp_file)
async def _handle_tmp_file(
self, tmp_file: File, supports_resume: bool, response: ResponseInfo
) -> None:
tmp_file_size = await tmp_file.get_size()
expected_size = response.content_length
if (
supports_resume and expected_size is not None
and self.MIN_RESUME_FILE_LENGTH < tmp_file_size < expected_size
):
logger.debug(f"Keep resume file {tmp_file.path}")
else:
await tmp_file.remove()
@staticmethod
async def _rename_file(
tmp_file: File, target_file: File, force_reload: bool, response: ResponseInfo
) -> Status:
target_path = target_file.path
if await target_file.exists() and force_reload:
i = 0
while target_path.with_suffix(f"{target_path.suffix}.old.{i}").exists():
i += 1
target_path.rename(target_path.with_suffix(f"{target_path.suffix}.old.{i}"))
tmp_file.path.rename(target_path)
logger.info(
f"File {target_path} downloaded in {response.response.elapsed}."
)
return Status.Success
@staticmethod
async def _check_and_return_download_result(
status_check_func: Callable,
tmp_file: File,
target_file: File,
response: ResponseInfo,
head_response: ResponseInfo,
expected_types: List[str]
) -> Optional[DownloadResult]:
status = await status_check_func(
response=response,
tmp_file=tmp_file,
target_file=target_file,
expected_types=expected_types
)
if status != Status.Success:
message = await tmp_file.read_text_content()
return DownloadResult(
status=status,
destination=target_file,
head_response=head_response,
response=response,
message=message
)
return None
async def _postprocessing(
self, tmp_file: File, target_file: File, response: ResponseInfo,
force_reload: bool
) -> DownloadResult:
head_response = await self.get_head_response()
status_checks = [
check_status_for_message,
check_status_code,
check_status_code,
check_content_type
]
for check in status_checks:
result = await self._check_and_return_download_result(
check, tmp_file, target_file, response,
head_response, self._expected_types
)
if result:
return result
await self._rename_file(
tmp_file=tmp_file,
target_file=target_file,
force_reload=force_reload,
response=response,
)
return DownloadResult(
status=Status.Success,
destination=target_file,
head_response=head_response,
response=response,
message=None
)
async def _stream_download(
self,
tmp_file: File,
target_file: File,
start: int,
progressbar: Union[tqdm.tqdm, DummyProgressBar],
force_reload: bool = True
) -> DownloadResult:
headers = self._additional_headers.copy()
if start > 0:
headers.update(Range=f"bytes={start}-")
file_mode: FileMode = "ab"
else:
file_mode: FileMode = "wb"
async with self._client.stream(
method="GET", url=self._source, follow_redirects=True, headers=headers
) as response:
with progressbar:
async with aiofiles.open(tmp_file.path, mode=file_mode) as file:
async for chunk in response.aiter_bytes():
await file.write(chunk)
progressbar.update(len(chunk))
return await self._postprocessing(
tmp_file=tmp_file,
target_file=target_file,
response=ResponseInfo(response=response),
force_reload=force_reload
)
async def _download(
self, tmp_file: File, target_file: File, start: int, force_reload: bool
) -> DownloadResult:
headers = self._additional_headers.copy()
if start > 0:
headers.update(Range=f"bytes={start}-")
file_mode: FileMode = "ab"
else:
file_mode: FileMode = "wb"
response = await self._client.get(
self._source, follow_redirects=True, headers=headers
)
async with aiofiles.open(tmp_file.path, mode=file_mode) as file:
await file.write(response.content)
return await self._postprocessing(
tmp_file=tmp_file,
target_file=target_file,
response=ResponseInfo(response=response),
force_reload=force_reload
)
async def run(
self,
target: pathlib.Path,
force_reload: bool = False
) -> DownloadResult:
target_file = File(target)
destination_status = await check_target_file_status(
target_file, force_reload
)
if destination_status != Status.Success:
return DownloadResult(
status=destination_status,
destination=target_file,
head_response=None,
response=None,
message=None
)
head_response = await self.get_head_response()
supports_resume = head_response.supports_resume()
if supports_resume:
tmp_file = await self._determine_resume_file(target_file=target_file)
start = await tmp_file.get_size()
else:
tmp_file = self._determine_tmp_file(target_file=target_file)
await tmp_file.remove()
start = 0
should_stream = False
progressbar = None
if (
head_response.content_length is not None and
head_response.content_length >= self.MIN_STREAM_LENGTH
):
should_stream = True
progressbar = get_progressbar(
target_file.path, head_response.content_length, start
)
try:
if should_stream:
return await self._stream_download(
tmp_file=tmp_file,
target_file=target_file,
start=start,
progressbar=progressbar,
force_reload=force_reload
)
else:
return await self._download(
tmp_file=tmp_file,
target_file=target_file,
start=start,
force_reload=force_reload
)
finally:
await self._handle_tmp_file(
tmp_file=tmp_file,
supports_resume=supports_resume,
response=head_response
)

View file

@ -1,4 +1,3 @@
from datetime import datetime
from pathlib import Path
@ -10,10 +9,6 @@ class NotFoundError(AudibleCliException):
"""Raised if an item is not found"""
class NotDownloadableAsAAX(AudibleCliException):
"""Raised if an item is not downloadable in aax format"""
class FileDoesNotExists(AudibleCliException):
"""Raised if a file does not exist"""
@ -42,53 +37,3 @@ class ProfileAlreadyExists(AudibleCliException):
def __init__(self, name):
message = f"Profile {name} already exist"
super().__init__(message)
class LicenseDenied(AudibleCliException):
"""Raised if a license request is not granted"""
class NoDownloadUrl(AudibleCliException):
"""Raised if a license response does not contain a download url"""
def __init__(self, asin):
message = f"License response for {asin} does not contain a download url"
super().__init__(message)
class DownloadUrlExpired(AudibleCliException):
"""Raised if a download url is expired"""
def __init__(self, lr_file):
message = f"Download url in {lr_file} is expired."
super().__init__(message)
class VoucherNeedRefresh(AudibleCliException):
"""Raised if a voucher reached his refresh date"""
def __init__(self, lr_file):
message = f"Refresh date for voucher {lr_file} reached."
super().__init__(message)
class ItemNotPublished(AudibleCliException):
"""Raised if a voucher reached his refresh date"""
def __init__(self, asin: str, pub_date):
pub_date = datetime.strptime(pub_date, "%Y-%m-%dT%H:%M:%SZ")
now = datetime.utcnow()
published_in = pub_date - now
pub_str = ""
if published_in.days > 0:
pub_str += f"{published_in.days} days, "
seconds = published_in.seconds
hours, remainder = divmod(seconds, 3600)
minutes, seconds = divmod(remainder, 60)
hms = "{:02}h:{:02}m:{:02}s".format(int(hours), int(minutes), int(seconds))
pub_str += hms
message = f"{asin} is not published. It will be available in {pub_str}"
super().__init__(message)

View file

@ -1,27 +1,17 @@
import asyncio
import logging
import secrets
import string
import unicodedata
from datetime import datetime
from math import ceil
from typing import List, Optional, Union
from warnings import warn
import audible
import httpx
from audible.aescipher import decrypt_voucher_from_licenserequest
from audible.client import convert_response_content
from .constants import CODEC_HIGH_QUALITY, CODEC_NORMAL_QUALITY
from .exceptions import (
AudibleCliException,
LicenseDenied,
NoDownloadUrl,
NotDownloadableAsAAX,
ItemNotPublished
)
from .utils import full_response_callback, LongestSubString
from .exceptions import AudibleCliException
from .utils import LongestSubString
logger = logging.getLogger("audible_cli.models")
@ -70,7 +60,7 @@ class BaseItem:
@property
def full_title_slugify(self):
valid_chars = "-_.() " + string.ascii_letters + string.digits
cleaned_title = unicodedata.normalize("NFKD", self.full_title or "")
cleaned_title = unicodedata.normalize("NFKD", self.full_title)
cleaned_title = cleaned_title.encode("ASCII", "ignore")
cleaned_title = cleaned_title.replace(b" ", b"_")
slug_title = "".join(
@ -82,27 +72,6 @@ class BaseItem:
return slug_title
def create_base_filename(self, mode: str):
supported_modes = ("ascii", "asin_ascii", "unicode", "asin_unicode")
if mode not in supported_modes:
raise AudibleCliException(
f"Unsupported mode {mode} for name creation"
)
if "ascii" in mode:
base_filename = self.full_title_slugify
elif "unicode" in mode:
base_filename = unicodedata.normalize("NFKD", self.full_title or "")
else:
base_filename = self.asin
if "asin" in mode:
base_filename = self.asin + "_" + base_filename
return base_filename
def substring_in_title_accuracy(self, substring):
match = LongestSubString(substring, self.full_title)
return round(match.percentage, 2)
@ -118,9 +87,6 @@ class BaseItem:
return images[res]
def get_pdf_url(self):
if not self.is_published():
raise ItemNotPublished(self.asin, self.publication_datetime)
if self.pdf_url is not None:
domain = self._client.auth.locale.domain
return f"https://www.audible.{domain}/companion-file/{self.asin}"
@ -131,22 +97,6 @@ class BaseItem:
or self.content_type == "Podcast") and self.has_children:
return True
def is_published(self):
if (
self.content_delivery_type and self.content_delivery_type == "AudioPart"
and self._parent
):
publication_datetime = self._parent.publication_datetime
else:
publication_datetime = self.publication_datetime
if publication_datetime is not None:
pub_date = datetime.strptime(
publication_datetime, "%Y-%m-%dT%H:%M:%SZ"
)
now = datetime.utcnow()
return now > pub_date
class LibraryItem(BaseItem):
def _prepare_data(self, data: dict) -> dict:
@ -203,7 +153,7 @@ class LibraryItem(BaseItem):
"""
# Only items with content_delivery_type
# MultiPartBook or Periodical have child elements
# MultiPartBook or Periodical have child elemts
if not self.has_children:
return
@ -239,25 +189,23 @@ class LibraryItem(BaseItem):
def is_downloadable(self):
# customer_rights must be in response_groups
if self.customer_rights is not None:
if self.customer_rights["is_consumable_offline"]:
if not self.customer_rights["is_consumable_offline"]:
return False
else:
return True
return False
async def get_aax_url_old(self, quality: str = "high"):
if not self.is_published():
raise ItemNotPublished(self.asin, self.publication_datetime)
if not self.is_downloadable():
raise AudibleCliException(
f"{self.full_title} is not downloadable."
f"{self.full_title} is not downloadable. Skip item."
)
codec, codec_name = self._get_codec(quality)
if codec is None or self.is_ayce:
raise NotDownloadableAsAAX(
if codec is None:
raise AudibleCliException(
f"{self.full_title} is not downloadable in AAX format"
)
url = (
"https://cde-ta-g7g.amazon.com/FionaCDEServiceEngine/"
"FSDownloadContent"
@ -283,8 +231,6 @@ class LibraryItem(BaseItem):
return httpx.URL(link), codec_name
async def get_aax_url(self, quality: str = "high"):
if not self.is_published():
raise ItemNotPublished(self.asin, self.publication_datetime)
if not self.is_downloadable():
raise AudibleCliException(
@ -292,8 +238,8 @@ class LibraryItem(BaseItem):
)
codec, codec_name = self._get_codec(quality)
if codec is None or self.is_ayce:
raise NotDownloadableAsAAX(
if codec is None:
raise AudibleCliException(
f"{self.full_title} is not downloadable in AAX format"
)
@ -305,125 +251,47 @@ class LibraryItem(BaseItem):
}
return httpx.URL(url, params=params), codec_name
async def get_aaxc_url(
self,
quality: str = "high",
license_response_groups: Optional[str] = None
):
if not self.is_published():
raise ItemNotPublished(self.asin, self.publication_datetime)
async def get_aaxc_url(self, quality: str = "high"):
assert quality in ("best", "high", "normal",)
if not self.is_downloadable():
raise AudibleCliException(
f"{self.full_title} is not downloadable."
body = {
"supported_drm_types": ["Mpeg", "Adrm"],
"quality": "Extreme" if quality in ("best", "high") else "Normal",
"consumption_type": "Download",
"response_groups": (
"last_position_heard, pdf_url, content_reference, chapter_info"
)
}
lr = await self.get_license(quality, license_response_groups)
lr = await self._client.post(
f"content/{self.asin}/licenserequest",
body=body
)
content_metadata = lr["content_license"]["content_metadata"]
url = httpx.URL(content_metadata["content_url"]["offline_url"])
codec = content_metadata["content_reference"]["content_format"]
voucher = decrypt_voucher_from_licenserequest(self._client.auth, lr)
lr["content_license"]["license_response"] = voucher
return url, codec, lr
async def get_license(
self,
quality: str = "high",
response_groups: Optional[str] = None
):
async def get_content_metadata(self, quality: str = "high"):
assert quality in ("best", "high", "normal",)
if response_groups is None:
response_groups = "last_position_heard, pdf_url, content_reference"
body = {
"supported_drm_types": ["Mpeg", "Adrm"],
"quality": "High" if quality in ("best", "high") else "Normal",
"consumption_type": "Download",
"response_groups": response_groups
}
headers = {
"X-Amzn-RequestId": secrets.token_hex(20).upper(),
"X-ADP-SW": "37801821",
"X-ADP-Transport": "WIFI",
"X-ADP-LTO": "120",
"X-Device-Type-Id": "A2CZJZGLK2JJVM",
"device_idiom": "phone"
}
lr = await self._client.post(
f"content/{self.asin}/licenserequest",
body=body,
headers=headers
)
content_license = lr["content_license"]
if content_license["status_code"] == "Denied":
if "license_denial_reasons" in content_license:
for reason in content_license["license_denial_reasons"]:
message = reason.get("message", "UNKNOWN")
rejection_reason = reason.get("rejectionReason", "UNKNOWN")
validation_type = reason.get("validationType", "UNKNOWN")
logger.debug(
f"License denied message for {self.asin}: {message}."
f"Reason: {rejection_reason}."
f"Type: {validation_type}"
)
msg = content_license["message"]
raise LicenseDenied(msg)
content_url = content_license["content_metadata"]\
.get("content_url", {}).get("offline_url")
if content_url is None:
raise NoDownloadUrl(self.asin)
if "license_response" in content_license:
try:
voucher = decrypt_voucher_from_licenserequest(
self._client.auth, lr
)
except Exception:
logger.error(f"Decrypting voucher for {self.asin} failed")
else:
content_license["license_response"] = voucher
else:
logger.error(f"No voucher for {self.asin} found")
return lr
async def get_content_metadata(
self, quality: str = "high", chapter_type: str = "Tree", **request_kwargs
):
chapter_type = chapter_type.capitalize()
assert quality in ("best", "high", "normal",)
assert chapter_type in ("Flat", "Tree")
url = f"content/{self.asin}/metadata"
params = {
"response_groups": "last_position_heard, content_reference, "
"chapter_info",
"quality": "High" if quality in ("best", "high") else "Normal",
"drm_type": "Adrm",
"chapter_titles_type": chapter_type,
**request_kwargs
"quality": "Extreme" if quality in ("best", "high") else "Normal",
"drm_type": "Adrm"
}
metadata = await self._client.get(url, params=params)
return metadata
async def get_annotations(self):
url = f"https://cde-ta-g7g.amazon.com/FionaCDEServiceEngine/sidecar"
params = {
"type": "AUDI",
"key": self.asin
}
annotations = await self._client.get(url, params=params)
return annotations
class WishlistItem(BaseItem):
pass
@ -447,13 +315,9 @@ class BaseList:
def _prepare_data(self, data: Union[dict, list]) -> Union[dict, list]:
return data
@property
def data(self):
return self._data
def get_item_by_asin(self, asin):
try:
return next(i for i in self._data if asin == i.asin)
return next(i for i in self._data if asin in i.asin)
except StopIteration:
return None
@ -490,42 +354,8 @@ class Library(BaseList):
async def from_api(
cls,
api_client: audible.AsyncClient,
include_total_count_header: bool = False,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None,
**request_params
):
def filter_by_date(item):
if item.purchase_date is not None:
date_added = datetime.strptime(
item.purchase_date,
"%Y-%m-%dT%H:%M:%S.%fZ"
)
elif item.library_status.get("date_added") is not None:
date_added = datetime.strptime(
item.library_status.get("date_added"),
"%Y-%m-%dT%H:%M:%S.%fZ"
)
else:
logger.info(
f"{item.asin}: {item.full_title} can not determine date added."
)
return True
if start_date is not None and start_date > date_added:
return False
# If a new episode is added to a parent podcast, the purchase_date
# and date_added is set to this date. This can makes things
# difficult to get older podcast episodes
# the end date will be filtered by the resolve_podcasts function later
if item.is_parent_podcast():
return True
if end_date is not None and end_date < date_added:
return False
return True
if "response_groups" not in request_params:
request_params["response_groups"] = (
"contributors, customer_rights, media, price, product_attrs, "
@ -539,29 +369,8 @@ class Library(BaseList):
"periodicals, provided_review, product_details"
)
if start_date is not None:
if "purchase_date" in request_params:
raise AudibleCliException(
"Do not use purchase_date and start_date together"
)
request_params["purchased_after"] = start_date.strftime(
"%Y-%m-%dT%H:%M:%S.%fZ")
resp: httpx.Response = await api_client.get(
"library",
response_callback=full_response_callback,
**request_params
)
resp_content = convert_response_content(resp)
total_count_header = resp.headers.get("total-count")
cls_instance = cls(resp_content, api_client=api_client)
if start_date is not None or end_date is not None:
cls_instance._data = list(filter(filter_by_date, cls_instance.data))
if include_total_count_header:
return cls_instance, total_count_header
return cls_instance
resp = await api_client.get("library", **request_params)
return cls(resp, api_client=api_client)
@classmethod
async def from_api_full_sync(
@ -570,59 +379,33 @@ class Library(BaseList):
bunch_size: int = 1000,
**request_params
) -> "Library":
request_params.pop("page", None)
request_params["page"] = 1
request_params["num_results"] = bunch_size
library, total_count = await cls.from_api(
api_client,
page=1,
include_total_count_header=True,
**request_params
)
pages = ceil(int(total_count) / bunch_size)
if pages == 1:
return library
library = []
while True:
resp = await cls.from_api(api_client, params=request_params)
items = resp._data
len_items = len(items)
library.extend(items)
if len_items < bunch_size:
break
request_params["page"] += 1
additional_pages = []
for page in range(2, pages+1):
additional_pages.append(
cls.from_api(
api_client,
page=page,
**request_params
)
)
resp._data = library
return resp
additional_pages = await asyncio.gather(*additional_pages)
async def resolve_podcats(self):
podcasts = []
for i in self:
if i.is_parent_podcast():
podcasts.append(i)
for p in additional_pages:
library.data.extend(p.data)
return library
async def resolve_podcats(
self,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None
):
warn(
"resolve_podcats is deprecated, use resolve_podcasts instead",
DeprecationWarning,
stacklevel=2
)
return self.resolve_podcasts(start_date, end_date)
async def resolve_podcasts(
self,
start_date: Optional[datetime] = None,
end_date: Optional[datetime] = None
):
podcast_items = await asyncio.gather(
*[i.get_child_items(start_date=start_date, end_date=end_date)
for i in self if i.is_parent_podcast()]
*[i.get_child_items() for i in podcasts]
)
for i in podcast_items:
self.data.extend(i.data)
self._data.extend(i._data)
class Catalog(BaseList):
@ -681,12 +464,17 @@ class Catalog(BaseList):
return cls(resp, api_client=api_client)
async def resolve_podcasts(self):
async def resolve_podcats(self):
podcasts = []
for i in self:
if i.is_parent_podcast():
podcasts.append(i)
podcast_items = await asyncio.gather(
*[i.get_child_items() for i in self if i.is_parent_podcast()]
*[i.get_child_items() for i in podcasts]
)
for i in podcast_items:
self.data.extend(i.data)
self._data.extend(i._data)
class Wishlist(BaseList):

View file

@ -28,49 +28,39 @@ def from_folder(plugin_dir: Union[str, pathlib.Path]):
"""
def decorator(group):
if not isinstance(group, click.Group):
raise TypeError(
"Plugins can only be attached to an instance of click.Group()"
)
raise TypeError("Plugins can only be attached to an instance of "
"click.Group()")
plugin_path = pathlib.Path(plugin_dir).resolve()
sys.path.insert(0, str(plugin_path))
pdir = pathlib.Path(plugin_dir)
cmds = [x for x in pdir.glob("cmd_*.py")]
sys.path.insert(0, str(pdir.resolve()))
for cmd_path in plugin_path.glob("cmd_*.py"):
cmd_path_stem = cmd_path.stem
for cmd in cmds:
mod_name = cmd.stem
try:
mod = import_module(cmd_path_stem)
cmd = mod.cli
if cmd.name == "cli":
# if no name given to the command, use the filename
# excl. starting cmd_ as name
cmd.name = cmd_path_stem[4:]
group.add_command(cmd)
orig_help = cmd.help or ""
new_help = (
f"(P) {orig_help}\n\nPlugin loaded from file: {str(cmd_path)}"
)
cmd.help = new_help
mod = import_module(mod_name)
name = mod_name[4:] if mod.cli.name == "cli" else mod.cli.name
group.add_command(mod.cli, name=name)
except Exception: # noqa
# Catch this so a busted plugin doesn't take down the CLI.
# Handled by registering a dummy command that does nothing
# other than explain the error.
group.add_command(BrokenCommand(cmd_path_stem[4:]))
group.add_command(BrokenCommand(mod_name[4:]))
return group
return decorator
def from_entry_point(entry_point_group):
def from_entry_point(entry_point_group: str):
"""
A decorator to register external CLI commands to an instance of
`click.Group()`.
Parameters
----------
entry_point_group : list
A list producing one `pkg_resources.EntryPoint()` per iteration.
entry_point_group : iter
An iterable producing one `pkg_resources.EntryPoint()` per iteration.
Returns
-------
@ -78,23 +68,13 @@ def from_entry_point(entry_point_group):
"""
def decorator(group):
if not isinstance(group, click.Group):
raise TypeError(
"Plugins can only be attached to an instance of click.Group()"
)
print(type(group))
raise TypeError("Plugins can only be attached to an instance of "
"click.Group()")
for entry_point in entry_point_group or ():
try:
cmd = entry_point.load()
dist_name = entry_point.dist.name
if cmd.name == "cli":
# if no name given to the command, use the filename
# excl. starting cmd_ as name
cmd.name = dist_name
group.add_command(cmd)
orig_help = cmd.help or ""
new_help = f"(P) {orig_help}\n\nPlugin loaded from package: {dist_name}"
cmd.help = new_help
group.add_command(entry_point.load())
except Exception: # noqa
# Catch this so a busted plugin doesn't take down the CLI.
# Handled by registering a dummy command that does nothing

View file

@ -1,8 +1,9 @@
import csv
import asyncio
import io
import logging
import pathlib
from difflib import SequenceMatcher
from functools import partial, wraps
from typing import List, Optional, Union
import aiofiles
@ -11,7 +12,6 @@ import httpx
import tqdm
from PIL import Image
from audible import Authenticator
from audible.client import raise_for_status
from audible.login import default_login_url_callback
from click import echo, secho, prompt
@ -21,15 +21,6 @@ from .constants import DEFAULT_AUTH_FILE_ENCRYPTION
logger = logging.getLogger("audible_cli.utils")
datetime_type = click.DateTime([
"%Y-%m-%d",
"%Y-%m-%dT%H:%M:%S",
"%Y-%m-%d %H:%M:%S",
"%Y-%m-%dT%H:%M:%S.%fZ",
"%Y-%m-%dT%H:%M:%SZ"
])
def prompt_captcha_callback(captcha_url: str) -> str:
"""Helper function for handling captcha."""
@ -41,7 +32,7 @@ def prompt_captcha_callback(captcha_url: str) -> str:
img.show()
else:
echo(
"Please open the following url with a web browser "
"Please open the following url with a webbrowser "
"to get the captcha:"
)
echo(captcha_url)
@ -69,11 +60,6 @@ def prompt_external_callback(url: str) -> str:
return default_login_url_callback(url)
def full_response_callback(resp: httpx.Response) -> httpx.Response:
raise_for_status(resp)
return resp
def build_auth_file(
filename: Union[str, pathlib.Path],
username: Optional[str],
@ -156,6 +142,17 @@ def asin_in_library(asin, library):
return False
def wrap_async(func):
@wraps(func)
async def run(*args, loop=None, executor=None, **kwargs):
if loop is None:
loop = asyncio.get_event_loop()
pfunc = partial(func, *args, **kwargs)
return await loop.run_in_executor(executor, pfunc)
return run
class DummyProgressBar:
def __enter__(self):
return self
@ -259,7 +256,8 @@ class Downloader:
file.rename(file.with_suffix(f"{file.suffix}.old.{i}"))
tmp_file.rename(file)
logger.info(
f"File {self._file} downloaded in {elapsed}."
f"File {self._file} downloaded to {self._file.parent} "
f"in {elapsed}."
)
return True
@ -302,17 +300,3 @@ class Downloader:
await self._load()
finally:
self._remove_tmp_file()
def export_to_csv(
file: pathlib.Path,
data: list,
headers: Union[list, tuple],
dialect: str
) -> None:
with file.open("w", encoding="utf-8", newline="") as f:
writer = csv.DictWriter(f, fieldnames=headers, dialect=dialect)
writer.writeheader()
for i in data:
writer.writerow(i)

View file

@ -5,5 +5,5 @@ Tab completion can be provided for commands, options and choice values.
Bash, Zsh and Fish are supported.
Simply copy the activation script for your shell from this folder to your machine.
Read [here](https://click.palletsprojects.com/en/8.0.x/shell-completion/)
Read [here](https://click.palletsprojects.com/en/7.x/bashcomplete/#activation-script)
how-to activate the script in your shell.

View file

@ -1,2 +1,2 @@
_AUDIBLE_COMPLETE=bash_source audible
_AUDIBLE_QUICKSTART_COMPLETE=bash_source audible-quickstart
_AUDIBLE_COMPLETE=source_bash audible
_AUDIBLE_QUICKSTART_COMPLETE=source_bash audible-quickstart

View file

@ -1,2 +1,2 @@
_AUDIBLE_COMPLETE=zsh_source audible
_AUDIBLE_QUICKSTART_COMPLETE=zsh_source audible-quickstart
_AUDIBLE_COMPLETE=source_zsh audible
_AUDIBLE_QUICKSTART_COMPLETE=source_zsh audible-quickstart

View file

@ -1,6 +1,6 @@
"""
This script replaces the chapter titles from a ffmetadata file with the one
extracted from an API metadata/voucher file
extracted from a api metadata/voucher file
Example: