"""Removes encryption of aax and aaxc files. This is a proof-of-concept and for testing purposes only. No error handling. Need further work. Some options do not work or options are missing. Needs at least ffmpeg 4.4 """ import json import operator import pathlib import re import subprocess # noqa: S404 import tempfile import typing as t from enum import Enum from functools import reduce from glob import glob from shutil import which import click from click import echo, secho from audible_cli.decorators import pass_session from audible_cli.exceptions import AudibleCliException class ChapterError(AudibleCliException): """Base class for all chapter errors.""" class SupportedFiles(Enum): AAX = ".aax" AAXC = ".aaxc" @classmethod def get_supported_list(cls): return list(set(item.value for item in cls)) @classmethod def is_supported_suffix(cls, value): return value in cls.get_supported_list() @classmethod def is_supported_file(cls, value): return pathlib.PurePath(value).suffix in cls.get_supported_list() def _get_input_files( files: t.Union[t.Tuple[str], t.List[str]], recursive: bool = True ) -> t.List[pathlib.Path]: filenames = [] for filename in files: # if the shell does not do filename globbing expanded = list(glob(filename, recursive=recursive)) if ( len(expanded) == 0 and '*' not in filename and not SupportedFiles.is_supported_file(filename) ): raise click.BadParameter("{filename}: file not found or supported.") expanded_filter = filter( lambda x: SupportedFiles.is_supported_file(x), expanded ) expanded = list(map(lambda x: pathlib.Path(x).resolve(), expanded_filter)) filenames.extend(expanded) return filenames def recursive_lookup_dict(key: str, dictionary: t.Dict[str, t.Any]) -> t.Any: if key in dictionary: return dictionary[key] for value in dictionary.values(): if isinstance(value, dict): try: item = recursive_lookup_dict(key, value) except KeyError: continue else: return item raise KeyError def get_aaxc_credentials(voucher_file: pathlib.Path): if not voucher_file.exists() or not voucher_file.is_file(): raise AudibleCliException(f"Voucher file {voucher_file} not found.") voucher_dict = json.loads(voucher_file.read_text()) try: key = recursive_lookup_dict("key", voucher_dict) iv = recursive_lookup_dict("iv", voucher_dict) except KeyError: raise AudibleCliException(f"No key/iv found in file {voucher_file}.") from None return key, iv class ApiChapterInfo: def __init__(self, content_metadata: t.Dict[str, t.Any]) -> None: chapter_info = self._parse(content_metadata) self._chapter_info = chapter_info @classmethod def from_file(cls, file: t.Union[pathlib.Path, str]) -> "ApiChapterInfo": file = pathlib.Path(file) if not file.exists() or not file.is_file(): raise ChapterError(f"Chapter file {file} not found.") content_string = pathlib.Path(file).read_text("utf-8") content_json = json.loads(content_string) return cls(content_json) @staticmethod def _parse(content_metadata: t.Dict[str, t.Any]) -> t.Dict[str, t.Any]: if "chapters" in content_metadata: return content_metadata try: return recursive_lookup_dict("chapter_info", content_metadata) except KeyError: raise ChapterError("No chapter info found.") from None def count_chapters(self): return len(self.get_chapters()) def get_chapters(self, separate_intro_outro=False, remove_intro_outro=False): def extract_chapters(initial, current): if "chapters" in current: return initial + [current] + current["chapters"] else: return initial + [current] chapters = list( reduce( extract_chapters, self._chapter_info["chapters"], [], ) ) if separate_intro_outro: return self._separate_intro_outro(chapters) elif remove_intro_outro: return self._remove_intro_outro(chapters) return chapters def get_intro_duration_ms(self): return self._chapter_info["brandIntroDurationMs"] def get_outro_duration_ms(self): return self._chapter_info["brandOutroDurationMs"] def get_runtime_length_ms(self): return self._chapter_info["runtime_length_ms"] def is_accurate(self): return self._chapter_info["is_accurate"] def _separate_intro_outro(self, chapters): echo("Separate Audible Brand Intro and Outro to own Chapter.") chapters.sort(key=operator.itemgetter("start_offset_ms")) first = chapters[0] intro_dur_ms = self.get_intro_duration_ms() first["start_offset_ms"] = intro_dur_ms first["start_offset_sec"] = round(first["start_offset_ms"] / 1000) first["length_ms"] -= intro_dur_ms last = chapters[-1] outro_dur_ms = self.get_outro_duration_ms() last["length_ms"] -= outro_dur_ms chapters.append( { "length_ms": intro_dur_ms, "start_offset_ms": 0, "start_offset_sec": 0, "title": "Intro", } ) chapters.append( { "length_ms": outro_dur_ms, "start_offset_ms": self.get_runtime_length_ms() - outro_dur_ms, "start_offset_sec": round( (self.get_runtime_length_ms() - outro_dur_ms) / 1000 ), "title": "Outro", } ) chapters.sort(key=operator.itemgetter("start_offset_ms")) return chapters def _remove_intro_outro(self, chapters): echo("Delete Audible Brand Intro and Outro.") chapters.sort(key=operator.itemgetter("start_offset_ms")) intro_dur_ms = self.get_intro_duration_ms() outro_dur_ms = self.get_outro_duration_ms() first = chapters[0] first["length_ms"] -= intro_dur_ms for chapter in chapters[1:]: chapter["start_offset_ms"] -= intro_dur_ms chapter["start_offset_sec"] -= round(chapter["start_offset_ms"] / 1000) last = chapters[-1] last["length_ms"] -= outro_dur_ms return chapters class FFMeta: SECTION = re.compile(r"\[(?P
[^]]+)\]") OPTION = re.compile(r"(?P