From c923f90a753a5ab5291e06156c0591739155f9e4 Mon Sep 17 00:00:00 2001 From: Pawel Date: Wed, 3 Jun 2026 14:01:18 -0400 Subject: [PATCH] Updated docs + added TTS backends --- README.md | 113 +++++++++- examples/config.yaml | 62 +++++- requirements-tts.txt | 5 + requirements.txt | 1 + saiki.py | 2 + src/saiki/ankiconnect.py | 8 +- src/saiki/audio.py | 19 +- src/saiki/cli.py | 85 ++++++- src/saiki/config.py | 48 +++- src/saiki/importer.py | 465 +++++++++++++++++++++++++++++++++++++-- src/saiki/text.py | 8 +- src/saiki/words.py | 23 +- src/saiki/youtube.py | 16 +- tests/test_core.py | 97 +++++++- 14 files changed, 916 insertions(+), 36 deletions(-) create mode 100644 requirements-tts.txt diff --git a/README.md b/README.md index b47c195..78542a8 100644 --- a/README.md +++ b/README.md @@ -18,6 +18,7 @@ The name is a coined Japanese compound from `採` as in gathering/collecting and - [Anki](https://apps.ankiweb.net/) with [AnkiConnect](https://github.com/amikey/anki-connect) - `ffmpeg` - Python dependencies from `requirements.txt` +- Optional extra TTS backend tools: `piper`, `espeak-ng`, and `kokoro-onnx`. - spaCy models for word mining: ```shell @@ -35,6 +36,62 @@ pip install -r requirements.txt sudo dnf install ffmpeg ``` +### Optional TTS Backends + +The default `edge-tts` backend is installed by `requirements.txt`. Install only +the optional pieces you plan to test: + +```shell +# Python-backed optional engines: piper, kokoro. +pip install -r requirements-tts.txt + +# System package for espeak-ng. +sudo dnf install espeak-ng +``` + +Other package-manager names: + +```shell +sudo apt-get install espeak-ng +sudo pacman -S espeak-ng +``` + +Backend notes: + +- `edge-tts`: installed by `pip install edge-tts`; no API key, but it uses + Microsoft Edge's online TTS service. +- `gtts`: installed by `requirements.txt`; no API key, but it uses Google's + online TTS service through `gtts-cli`. +- `piper`: installed by `pip install piper-tts`; you still need a compatible + `.onnx` voice model, usually with its matching `.onnx.json` config file. +- `espeak-ng`: installed through your OS package manager, not pip. +- `kokoro`: installed by `pip install kokoro-onnx soundfile`; you still need + `kokoro-v1.0.onnx` and `voices-v1.0.bin`, plus any language-specific G2P + setup required by your Kokoro release. + +Example model downloads for the README smoke tests: + +```shell +mkdir -p ~/.local/share/saiki/models + +# Piper Spanish voice model plus matching config. +wget -O ~/.local/share/saiki/models/es_ES-davefx-medium.onnx \ + https://huggingface.co/rhasspy/piper-voices/resolve/main/es/es_ES/davefx/medium/es_ES-davefx-medium.onnx +wget -O ~/.local/share/saiki/models/es_ES-davefx-medium.onnx.json \ + https://huggingface.co/rhasspy/piper-voices/resolve/main/es/es_ES/davefx/medium/es_ES-davefx-medium.onnx.json + +# Kokoro ONNX model plus voices bundle. +wget -O ~/.local/share/saiki/models/kokoro-v1.0.onnx \ + https://github.com/thewh1teagle/kokoro-onnx/releases/download/model-files-v1.0/kokoro-v1.0.onnx +wget -O ~/.local/share/saiki/models/voices-v1.0.bin \ + https://github.com/thewh1teagle/kokoro-onnx/releases/download/model-files-v1.0/voices-v1.0.bin +``` + +Saiki's default `tts_model_dir` is `~/.local/share/saiki/models`. Relative +model paths such as `es_ES-davefx-medium.onnx` are resolved under that +directory. You can override it in YAML with `tts_model_dir` or for one command +with `--tts-model-dir`. + ## Configuration Defaults are built in, but you can override them with YAML: @@ -57,6 +114,7 @@ media_dir: ~/.var/app/net.ankiweb.Anki/data/Anki2/User 1/collection.media audio_output_root: ~/Languages/Anki/anki-audio word_output_root: ~/Languages/Anki/anki-words sentence_dir: ~/Languages/Anki +tts_model_dir: ~/.local/share/saiki/models note_model: Basic fields: front: Front @@ -65,8 +123,8 @@ languages: jp: name: japanese transcript_code: ja - tts_code: ja - tts_tld: com + tts_backend: edge-tts + tts_voice: ja-JP-NanamiNeural tts_tempo: 1.35 decks: ["日本語"] field: Back @@ -75,8 +133,8 @@ languages: es: name: spanish transcript_code: es - tts_code: es - tts_tld: es + tts_backend: edge-tts + tts_voice: es-ES-ElviraNeural tts_tempo: 1.25 decks: ["Español"] field: Back @@ -174,12 +232,59 @@ Generate TTS audio and add sentence cards to Anki. ./saiki.py import es ./saiki.py import jp ~/Languages/Anki/sentences_jp.txt ./saiki.py import es youtube.tsv --tags youtube,manual +./saiki.py import es --tts-voice es-MX-DaliaNeural ``` The importer accepts plain text sentence files and TSV/CSV files with a `sentence` column. `text-to-speech` is always added as a tag. If `--tags` is not provided, `AI-generated` is added. +TTS is configured per language with `tts_backend`. Supported backends are: + +- `edge-tts`: default backend using Microsoft Edge neural voices; configure + `tts_voice`. +- `gtts`: free backend using `gtts-cli`; configure `tts_code` and + `tts_tld`. +- `piper`: local/offline neural TTS; configure `tts_model` with a model path. + The stock Piper catalog includes Spanish voices, but not Japanese. +- `espeak-ng`: local/offline lightweight TTS; configure `tts_voice`. Spanish is + supported; Japanese is documented as kana-only and is not recommended for + normal Japanese sentence cards. +- `kokoro`: local/offline neural TTS; configure `tts_model`, `tts_voices`, + `tts_voice`, and `tts_code`; some Japanese setups also need + `tts_vocab_config`. Kokoro lists Japanese and Spanish voices, but upstream + notes that non-English quality can be thin. + +You can override backend settings for one import: + +```shell +./saiki.py import jp sentences_jp.txt \ + --tts-backend edge-tts \ + --tts-voice ja-JP-KeitaNeural +``` + +Voice-listing helpers: + +```shell +./saiki.py tts-voices jp +./saiki.py tts-voices es --backend edge-tts +``` + +Test a TTS backend without creating Anki cards: + +```shell +./saiki.py tts-test es --out /tmp/saiki_edge_default_es.mp3 +./saiki.py tts-test jp --tts-backend edge-tts --tts-voice ja-JP-NanamiNeural --out /tmp/saiki_edge_jp.mp3 +./saiki.py tts-test es --tts-backend edge-tts --tts-voice es-ES-ElviraNeural --out /tmp/saiki_edge_es.mp3 +./saiki.py tts-test es --tts-backend gtts --tts-code es --tts-tld es --out /tmp/saiki_gtts_es.mp3 +./saiki.py tts-test es --tts-backend piper --tts-model es_ES-davefx-medium.onnx --tts-config es_ES-davefx-medium.onnx.json --out /tmp/saiki_piper_es.mp3 +./saiki.py tts-test es --tts-backend espeak-ng --tts-voice es --out /tmp/saiki_espeak_es.mp3 +./saiki.py tts-test es --tts-backend kokoro --tts-model kokoro-v1.0.onnx --tts-voices voices-v1.0.bin --tts-voice ef_dora --out /tmp/saiki_kokoro_es.mp3 +``` + +For `kokoro`, put `tts_model`, `tts_voices`, and any needed `tts_vocab_config` +in your config file rather than typing every path each time. + ### Known/New Words Compare any generated word list against an existing known list: diff --git a/examples/config.yaml b/examples/config.yaml index 17360ae..537fa8c 100644 --- a/examples/config.yaml +++ b/examples/config.yaml @@ -11,6 +11,7 @@ media_dir: ~/.var/app/net.ankiweb.Anki/data/Anki2/User 1/collection.media audio_output_root: ~/Languages/Anki/anki-audio word_output_root: ~/Languages/Anki/anki-words sentence_dir: ~/Languages/Anki +tts_model_dir: ~/.local/share/saiki/models note_model: Basic fields: @@ -21,8 +22,6 @@ languages: jp: name: japanese transcript_code: ja - tts_code: ja - tts_tld: com tts_tempo: 1.35 decks: - 日本語 @@ -30,14 +29,69 @@ languages: word_model: ja_core_news_lg sentence_file: sentences_jp.txt + # --- TTS backend (pick one) --- + + # edge-tts (default): Microsoft Edge neural TTS. Requires: pip install edge-tts + tts_backend: edge-tts + tts_voice: ja-JP-NanamiNeural # or ja-JP-KeitaNeural for male + + # gtts: free Google TTS via gtts-cli. Requires: pip install gtts + # tts_backend: gtts + # tts_code: ja + # tts_tld: com + + # piper: no stock Japanese voice is listed in Piper's official voice catalog. + # Only use this backend for JP if you have your own compatible Japanese model. + + # espeak-ng: Japanese support is kana-only in the upstream docs, so this is + # not a good fit for normal Japanese sentences that include kanji. + + # kokoro: offline neural TTS. Requires: pip install kokoro-onnx soundfile + # plus the Kokoro model/voice files and Japanese G2P dependencies for your + # installed Kokoro release. Kokoro lists Japanese voices, but its docs warn + # that non-English quality can be thin. + # tts_backend: kokoro + # tts_model: kokoro-v1.0.onnx + # tts_voices: voices-v1.0.bin + # tts_vocab_config: kokoro-ja-config.json + # tts_voice: jf_alpha # Japanese female; jm_kumo for male + # tts_code: ja + es: name: spanish transcript_code: es - tts_code: es - tts_tld: es tts_tempo: 1.25 decks: - Español field: Back word_model: es_core_news_sm sentence_file: sentences_es.txt + + # --- TTS backend (pick one) --- + + # edge-tts (default): Microsoft Edge neural TTS. Requires: pip install edge-tts + tts_backend: edge-tts + tts_voice: es-ES-ElviraNeural # or es-MX-DaliaNeural for Mexican Spanish + + # gtts: free Google TTS via gtts-cli. Requires: pip install gtts + # tts_backend: gtts + # tts_code: es + # tts_tld: es + + # piper: offline neural TTS. Requires: piper binary + model download + # tts_backend: piper + # tts_model: es_ES-davefx-medium.onnx + # tts_config: es_ES-davefx-medium.onnx.json + + # espeak-ng: offline, lightweight, robotic quality. Requires: espeak-ng package + # tts_backend: espeak-ng + # tts_voice: es + + # kokoro: offline neural TTS. Requires: pip install kokoro-onnx soundfile + # plus the Kokoro model/voice files. Kokoro lists Spanish voices, but its + # docs warn that non-English quality can be thin. + # tts_backend: kokoro + # tts_model: kokoro-v1.0.onnx + # tts_voices: voices-v1.0.bin + # tts_voice: ef_dora # Spanish female + # tts_code: es diff --git a/requirements-tts.txt b/requirements-tts.txt new file mode 100644 index 0000000..8886f42 --- /dev/null +++ b/requirements-tts.txt @@ -0,0 +1,5 @@ +# Optional free TTS backends. +# Install this only if you want to test/use non-default TTS engines. +piper-tts +kokoro-onnx +soundfile diff --git a/requirements.txt b/requirements.txt index aa83b50..cbd50c8 100644 --- a/requirements.txt +++ b/requirements.txt @@ -4,5 +4,6 @@ spacy youtube-transcript-api fugashi[unidic-lite] gTTS +edge-tts pyyaml genanki diff --git a/saiki.py b/saiki.py index 6ef493d..eb4d068 100755 --- a/saiki.py +++ b/saiki.py @@ -6,6 +6,8 @@ from pathlib import Path SRC_DIR = Path(__file__).resolve().parent / "src" if str(SRC_DIR) not in sys.path: + # Let the repository checkout run directly without requiring an editable + # install first. Installed packages will still resolve normally. sys.path.insert(0, str(SRC_DIR)) from saiki.cli import main diff --git a/src/saiki/ankiconnect.py b/src/saiki/ankiconnect.py index a65bd37..ef47637 100644 --- a/src/saiki/ankiconnect.py +++ b/src/saiki/ankiconnect.py @@ -6,6 +6,13 @@ import requests def anki_request(action: str, url: str = "http://localhost:8765", **params): + """Send one JSON-RPC style request to AnkiConnect. + + AnkiConnect exposes all operations as an ``action`` plus a ``params`` + object. This helper centralizes the protocol version, timeout, HTTP error + handling, and conversion of AnkiConnect's ``error`` field into a Python + exception. + """ resp = requests.post( url, json={"action": action, "version": 6, "params": params}, @@ -16,4 +23,3 @@ def anki_request(action: str, url: str = "http://localhost:8765", **params): if data.get("error") is not None: raise RuntimeError(f"AnkiConnect error for {action}: {data['error']}") return data["result"] - diff --git a/src/saiki/audio.py b/src/saiki/audio.py index 67bc905..7d30a5f 100644 --- a/src/saiki/audio.py +++ b/src/saiki/audio.py @@ -16,6 +16,13 @@ AUDIO_EXTS = (".mp3", ".wav", ".ogg", ".m4a", ".flac") def resolve_media_paths(media_dir: str, out_dir: str, media_name: str) -> tuple[str, str] | None: + """Return safe source and destination paths for one Anki media filename. + + Anki stores audio references as media names, not arbitrary filesystem + paths. Absolute paths and parent-directory traversal are rejected so a + malformed card cannot make the export read or write outside the configured + media/output directories. + """ normalized = os.path.normpath(media_name) if os.path.isabs(normalized) or normalized.startswith(".."): return None @@ -23,6 +30,7 @@ def resolve_media_paths(media_dir: str, out_dir: str, media_name: str) -> tuple[ def build_playlist(out_dir: str, language: str) -> str: + """Write an M3U playlist containing exported audio files for a language.""" m3u_path = os.path.join(out_dir, f"{language}.m3u") concat_name = f"{language}_concat.mp3" files: list[str] = [] @@ -42,6 +50,7 @@ def build_playlist(out_dir: str, language: str) -> str: def concat_audio_from_m3u(out_dir: str, m3u_path: str, out_path: str) -> None: + """Concatenate playlist entries into a single MP3 with ffmpeg.""" if shutil.which("ffmpeg") is None: raise RuntimeError("ffmpeg not found in PATH. Install ffmpeg to use --concat.") @@ -59,6 +68,8 @@ def concat_audio_from_m3u(out_dir: str, m3u_path: str, out_path: str) -> None: with tempfile.NamedTemporaryFile("w", delete=False, encoding="utf-8") as tmp: concat_list_path = tmp.name for path in abs_files: + # ffmpeg's concat demuxer uses single-quoted paths. Escape literal + # apostrophes so media filenames from Anki remain valid entries. tmp.write(f"file '{path.replace(chr(39), chr(39) + chr(92) + chr(39) + chr(39))}'\n") cmd = [ @@ -83,6 +94,13 @@ def extract_audio( concat: bool = False, request: Callable = anki_request, ) -> dict[str, object]: + """Copy audio from configured Anki decks and build a playlist. + + The return value is intentionally CLI-friendly: it reports the number of + copied files, the playlist path, the output directory, and the optional + concatenated MP3 path. ``request`` is injectable so tests can exercise the + workflow without a running Anki instance. + """ language = config.language_name(lang) selected_decks = config.decks_for(lang) if not selected_decks: @@ -123,4 +141,3 @@ def extract_audio( concat_path = os.path.join(out_dir, f"{language}_concat.mp3") concat_audio_from_m3u(out_dir, m3u_path, concat_path) return {"copied": len(copied), "playlist": m3u_path, "outdir": out_dir, "concat": concat_path} - diff --git a/src/saiki/cli.py b/src/saiki/cli.py index 761d4c7..2c92d96 100644 --- a/src/saiki/cli.py +++ b/src/saiki/cli.py @@ -7,17 +7,67 @@ import sys from .audio import extract_audio from .config import Config, language_choices, load_config -from .importer import import_sentences +from .importer import ( + format_tts_error, + import_sentences, + list_tts_voices, + supported_tts_backends, + synthesize_tts_sample, +) from .words import compare_word_files, extract_words from .youtube import run_youtube def add_config_arg(parser: argparse.ArgumentParser) -> None: + """Attach the shared ``--config`` option to a parser.""" parser.add_argument("--config", help="Path to YAML config file.") +def add_tts_override_args(parser: argparse.ArgumentParser, tts_backends: list[str]) -> None: + """Attach per-command TTS override flags. + + These options intentionally mirror config keys so command-line overrides + can be collected mechanically and merged over the selected language. + """ + parser.add_argument("--tts-backend", choices=tts_backends, help="Override the configured TTS backend.") + parser.add_argument("--tts-voice", help="Override the configured backend voice.") + parser.add_argument("--tts-voices", help="Override the configured backend voice bundle path.") + parser.add_argument("--tts-model", help="Override the configured backend model or local model path.") + parser.add_argument("--tts-model-dir", help="Override the directory used for relative TTS model paths.") + parser.add_argument("--tts-config", help="Override the configured backend model config path.") + parser.add_argument("--tts-vocab-config", help="Override the configured backend vocab config path.") + parser.add_argument("--tts-code", help="Override the configured backend language code.") + parser.add_argument("--tts-tld", help="Override the configured gTTS top-level domain.") + parser.add_argument("--tts-tempo", type=float, help="Override the post-processing tempo multiplier.") + parser.add_argument("--tts-speed", type=float, help="Override backend-native speech speed when supported.") + + +def collect_tts_overrides(args: argparse.Namespace) -> dict[str, object]: + """Collect TTS override attributes from an argparse namespace.""" + return { + "tts_backend": getattr(args, "tts_backend", None), + "tts_voice": getattr(args, "tts_voice", None), + "tts_voices": getattr(args, "tts_voices", None), + "tts_model": getattr(args, "tts_model", None), + "tts_model_dir": getattr(args, "tts_model_dir", None), + "tts_config": getattr(args, "tts_config", None), + "tts_vocab_config": getattr(args, "tts_vocab_config", None), + "tts_code": getattr(args, "tts_code", None), + "tts_tld": getattr(args, "tts_tld", None), + "tts_tempo": getattr(args, "tts_tempo", None), + "tts_speed": getattr(args, "tts_speed", None), + } + + def build_parser(config: Config | None = None) -> argparse.ArgumentParser: + """Build the full CLI parser. + + Passing a loaded config lets argparse choices reflect user-defined language + codes. When no config is supplied, defaults are loaded so the parser remains + usable in tests and help-generation contexts. + """ choices = language_choices(config or load_config()) + tts_backends = supported_tts_backends() parser = argparse.ArgumentParser(description="Saiki: sentence mining and listening tools for Anki.") add_config_arg(parser) sub = parser.add_subparsers(dest="command", required=True) @@ -61,11 +111,25 @@ def build_parser(config: Config | None = None) -> argparse.ArgumentParser: importer.add_argument("lang", choices=choices) importer.add_argument("sentence_file", nargs="?") importer.add_argument("--tags", help="Comma-separated tags. text-to-speech is always included.") + add_tts_override_args(importer, tts_backends) + + test_tts = sub.add_parser("tts-test", help="Synthesize one TTS sample without importing into Anki.") + test_tts.add_argument("lang", choices=choices) + test_tts.add_argument("text", nargs="?") + test_tts.add_argument("--out", help="Output MP3 path. Defaults to ./tts_test__.mp3.") + add_tts_override_args(test_tts, tts_backends) + + voices = sub.add_parser("tts-voices", help="List voices or voice-listing hints for a TTS backend.") + voices.add_argument("lang", nargs="?", choices=choices) + voices.add_argument("--backend", choices=tts_backends, help="Backend to list instead of the language default.") return parser def main(argv: list[str] | None = None) -> int: + """Run the CLI and return a process exit status.""" + # Parse --config first so subcommand language choices can come from the + # user's config file instead of only the built-in defaults. pre = argparse.ArgumentParser(add_help=False) add_config_arg(pre) known, _ = pre.parse_known_args(argv) @@ -114,10 +178,27 @@ def main(argv: list[str] | None = None) -> int: return 0 if args.command == "import": - result = import_sentences(config, args.lang, args.sentence_file, args.tags) + tts_overrides = collect_tts_overrides(args) + result = import_sentences(config, args.lang, args.sentence_file, args.tags, tts_overrides=tts_overrides) print(f"Done. Added {result.added}/{result.processed} cards. Failed: {result.failed}") + for error in result.errors: + print(f"Error: {error}", file=sys.stderr) return 0 if result.failed == 0 else 1 + if args.command == "tts-test": + try: + output = synthesize_tts_sample(config, args.lang, args.text, args.out, collect_tts_overrides(args)) + print(f"Wrote TTS sample: {output}") + return 0 + except Exception as exc: + print(f"Error: {format_tts_error(exc)}", file=sys.stderr) + return 1 + + if args.command == "tts-voices": + for line in list_tts_voices(config, args.lang, args.backend): + print(line) + return 0 + parser.print_help() return 2 diff --git a/src/saiki/config.py b/src/saiki/config.py index 05dd433..bbb4bae 100644 --- a/src/saiki/config.py +++ b/src/saiki/config.py @@ -23,14 +23,15 @@ DEFAULT_CONFIG: dict[str, Any] = { "audio_output_root": "~/Languages/Anki/anki-audio", "word_output_root": "~/Languages/Anki/anki-words", "sentence_dir": "~/Languages/Anki", + "tts_model_dir": "~/.local/share/saiki/models", "note_model": "Basic", "fields": {"front": "Front", "back": "Back"}, "languages": { "jp": { "name": "japanese", "transcript_code": "ja", - "tts_code": "ja", - "tts_tld": "com", + "tts_backend": "edge-tts", + "tts_voice": "ja-JP-NanamiNeural", "tts_tempo": 1.35, "decks": ["日本語"], "word_model": "ja_core_news_lg", @@ -40,8 +41,8 @@ DEFAULT_CONFIG: dict[str, Any] = { "es": { "name": "spanish", "transcript_code": "es", - "tts_code": "es", - "tts_tld": "es", + "tts_backend": "edge-tts", + "tts_voice": "es-ES-ElviraNeural", "tts_tempo": 1.25, "decks": ["Español"], "word_model": "es_core_news_sm", @@ -54,73 +55,108 @@ DEFAULT_CONFIG: dict[str, Any] = { @dataclass(frozen=True) class Config: + """Typed convenience wrapper around the merged YAML configuration. + + The underlying ``data`` mapping remains available for simple serialization + and tests, while properties and helpers provide normalized paths and common + language-specific lookups for the rest of the application. + """ + data: dict[str, Any] @property def anki_connect_url(self) -> str: + """URL for the local AnkiConnect HTTP server.""" return str(self.data["anki_connect_url"]) @property def media_dir(self) -> str: + """Expanded path to Anki's collection.media directory.""" return expand_path(str(self.data["media_dir"])) @property def audio_output_root(self) -> str: + """Expanded root directory for exported listening audio.""" return expand_path(str(self.data["audio_output_root"])) @property def word_output_root(self) -> str: + """Expanded root directory for generated vocabulary lists.""" return expand_path(str(self.data["word_output_root"])) @property def sentence_dir(self) -> str: + """Expanded directory used for relative sentence import files.""" return expand_path(str(self.data["sentence_dir"])) + @property + def tts_model_dir(self) -> str: + """Expanded directory used to resolve local TTS model paths.""" + return expand_path(str(self.data["tts_model_dir"])) + @property def note_model(self) -> str: + """Anki note type used when importing generated sentence cards.""" return str(self.data.get("note_model", "Basic")) @property def fields(self) -> dict[str, str]: + """Configured logical field names, currently front and back.""" return dict(self.data.get("fields", {})) @property def languages(self) -> dict[str, dict[str, Any]]: + """Language configurations keyed by CLI language code.""" return dict(self.data.get("languages", {})) def language(self, lang: str) -> dict[str, Any]: + """Return one language config with shared TTS defaults applied. + + A fresh dict is returned so callers may layer CLI overrides onto it + without mutating the loaded configuration. + """ try: - return dict(self.languages[lang]) + language = dict(self.languages[lang]) + language.setdefault("tts_model_dir", self.tts_model_dir) + return language except KeyError as e: available = ", ".join(sorted(self.languages)) raise ValueError(f"Unsupported language '{lang}'. Available: {available}") from e def language_name(self, lang: str) -> str: + """Return the long language bucket name for output directories.""" return str(self.language(lang)["name"]) def transcript_code(self, lang: str) -> str: + """Return the language code expected by transcript providers.""" return str(self.language(lang)["transcript_code"]) def decks_for(self, lang: str) -> list[str]: + """Return configured Anki deck names for a language.""" return list(self.language(lang).get("decks", [])) def field_for(self, lang: str) -> str: + """Return the Anki field to mine for vocabulary.""" return str(self.language(lang).get("field", self.fields.get("back", "Back"))) def sentence_file_for(self, lang: str) -> str: + """Resolve the sentence import file for a language.""" value = str(self.language(lang).get("sentence_file", f"sentences_{lang}.txt")) return expand_path(value if os.path.isabs(value) or value.startswith("~") else os.path.join(self.sentence_dir, value)) def expand_path(path: str) -> str: + """Expand ``~`` and environment variables in a configured path.""" return os.path.expanduser(os.path.expandvars(path)) def default_config_path() -> str: + """Return the conventional user config path.""" return expand_path("~/.config/saiki/config.yaml") def deep_merge(base: dict[str, Any], override: dict[str, Any]) -> dict[str, Any]: + """Recursively merge a user config mapping over default config values.""" result = copy.deepcopy(base) for key, value in override.items(): if isinstance(value, dict) and isinstance(result.get(key), dict): @@ -131,6 +167,7 @@ def deep_merge(base: dict[str, Any], override: dict[str, Any]) -> dict[str, Any] def load_config(path: str | None = None) -> Config: + """Load defaults plus an optional YAML config file.""" config = copy.deepcopy(DEFAULT_CONFIG) config_path = expand_path(path) if path else default_config_path() if os.path.exists(config_path): @@ -145,4 +182,5 @@ def load_config(path: str | None = None) -> Config: def language_choices(config: Config) -> list[str]: + """Return sorted language codes suitable for argparse choices.""" return sorted(config.languages.keys()) diff --git a/src/saiki/importer.py b/src/saiki/importer.py index 31e8d39..d89397a 100644 --- a/src/saiki/importer.py +++ b/src/saiki/importer.py @@ -1,28 +1,57 @@ -"""Generate TTS audio and add sentence notes to Anki.""" +"""Generate TTS audio and add sentence notes to Anki. + +This module owns the TTS backend abstraction used by both ``import`` and +``tts-test``. Backends synthesize their native output format first, then ffmpeg +normalizes the result to MP3 and applies the configured tempo multiplier. +""" from __future__ import annotations -import os import csv +import os import shutil import subprocess import tempfile import time -from dataclasses import dataclass -from typing import Callable +from dataclasses import dataclass, field +from typing import Any, Callable, Mapping from .ankiconnect import anki_request -from .config import Config +from .config import Config, expand_path @dataclass(frozen=True) class ImportResult: + """Summary of one sentence import run.""" + processed: int added: int failed: int + errors: list[str] = field(default_factory=list) + + +@dataclass(frozen=True) +class PreparedTtsBackend: + """Runtime-ready TTS backend callable plus its native audio extension.""" + + name: str + raw_ext: str + synthesize: Callable[[str, str], None] + + +@dataclass(frozen=True) +class TtsBackendSpec: + """Static metadata needed to validate and build a TTS backend.""" + + raw_ext: str + build: Callable[[dict[str, Any]], Callable[[str, str], None]] + required_keys: tuple[str, ...] = () + command: str | None = None + list_voices: Callable[[dict[str, Any]], list[str]] | None = None def parse_tags(value: str | None) -> list[str]: + """Parse comma-separated tag text and add Saiki's default TTS tags.""" tags = ["text-to-speech"] if value: tags.extend(tag.strip() for tag in value.split(",") if tag.strip()) @@ -32,15 +61,407 @@ def parse_tags(value: str | None) -> list[str]: def require_command(name: str) -> None: + """Raise a friendly error if an external command is not on PATH.""" if shutil.which(name) is None: raise RuntimeError(f"Required command not found: {name}") -def generate_tts(sentence: str, raw_output: str, lang_code: str, tld: str) -> None: - subprocess.run(["gtts-cli", sentence, "--lang", lang_code, "--tld", tld, "--output", raw_output], check=True) +_TTS_PATH_KEYS = ("tts_model", "tts_voices", "tts_vocab_config", "tts_config") +_MAX_ERROR_DETAILS = 5 +_DEFAULT_TEST_TEXT = { + "jp": "これはテストです。", + "es": "Esta es una prueba.", +} + + +def _generate_gtts(cfg: dict[str, Any]) -> Callable[[str, str], None]: + """Build a gTTS synthesizer using the command-line wrapper.""" + lang_code = str(cfg["tts_code"]) + tld = str(cfg["tts_tld"]) + + def synthesize(sentence: str, output: str) -> None: + subprocess.run( + ["gtts-cli", sentence, "--lang", lang_code, "--tld", tld, "--output", output], + stdin=subprocess.DEVNULL, + check=True, + ) + + return synthesize + + +def _generate_edge_tts(cfg: dict[str, Any]) -> Callable[[str, str], None]: + """Build an edge-tts synthesizer for a configured neural voice.""" + voice = str(cfg["tts_voice"]) + + def synthesize(sentence: str, output: str) -> None: + subprocess.run( + ["edge-tts", "--voice", voice, "--text", sentence, "--write-media", output], + stdin=subprocess.DEVNULL, + check=True, + ) + + return synthesize + + +def _generate_piper(cfg: dict[str, Any]) -> Callable[[str, str], None]: + """Build a Piper synthesizer around a local ONNX voice model.""" + model = str(cfg["tts_model"]) + _require_file("piper", "tts_model", model) + config = str(cfg["tts_config"]) if cfg.get("tts_config") else None + if config: + _require_file("piper", "tts_config", config) + + def synthesize(sentence: str, output: str) -> None: + command = ["piper", "--model", model] + if config: + command.extend(["--config", config]) + command.extend(["--output_file", output]) + subprocess.run( + command, + input=f"{sentence}\n".encode("utf-8"), + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + check=True, + ) + + return synthesize + + +def _generate_espeak(cfg: dict[str, Any]) -> Callable[[str, str], None]: + """Build an espeak-ng synthesizer for a configured voice code.""" + voice = str(cfg["tts_voice"]) + + def synthesize(sentence: str, output: str) -> None: + subprocess.run(["espeak-ng", "-v", voice, "-w", output, sentence], stdin=subprocess.DEVNULL, check=True) + + return synthesize + + +def _generate_kokoro(cfg: dict[str, Any]) -> Callable[[str, str], None]: + """Build a Kokoro ONNX synthesizer from local model and voice files.""" + _require_file("kokoro", "tts_model", str(cfg["tts_model"])) + _require_file("kokoro", "tts_voices", str(cfg["tts_voices"])) + if cfg.get("tts_vocab_config"): + _require_file("kokoro", "tts_vocab_config", str(cfg["tts_vocab_config"])) + + try: + from kokoro_onnx import Kokoro # type: ignore + import soundfile as sf # type: ignore + except ImportError as exc: + raise RuntimeError( + "kokoro backend requires 'kokoro-onnx' and 'soundfile'. Install them first." + ) from exc + + kokoro_kwargs = {} + if cfg.get("tts_vocab_config"): + kokoro_kwargs["vocab_config"] = str(cfg["tts_vocab_config"]) + kokoro = Kokoro(str(cfg["tts_model"]), str(cfg["tts_voices"]), **kokoro_kwargs) + voice = str(cfg["tts_voice"]) + lang_code = str(cfg["tts_code"]) + speed = _optional_float(cfg, "tts_speed", 1.0) + + def synthesize(sentence: str, output: str) -> None: + samples, sample_rate = kokoro.create(sentence, voice=voice, speed=speed, lang=lang_code) + sf.write(output, samples, sample_rate) + + return synthesize + + +def _list_gtts_voices(cfg: dict[str, Any]) -> list[str]: + """Return voice-listing guidance for gTTS.""" + return [ + "gtts does not expose named voices.", + f"Current settings: tts_code={cfg.get('tts_code', '')}, tts_tld={cfg.get('tts_tld', '')}", + ] + + +def _list_edge_voices(cfg: dict[str, Any]) -> list[str]: + """Return the configured edge-tts voice or ask the CLI to list online voices.""" + if cfg.get("tts_voice"): + return [ + f"Configured edge-tts voice: {cfg['tts_voice']}", + "Run `edge-tts --list-voices` directly to browse the full online voice catalog.", + ] + return _run_voice_command(["edge-tts", "--list-voices"]) + + +def _list_espeak_voices(cfg: dict[str, Any]) -> list[str]: + """List espeak-ng voices, narrowed by the configured language when possible.""" + voice_filter = str(cfg.get("tts_voice") or cfg.get("tts_code") or "") + arg = f"--voices={voice_filter}" if voice_filter else "--voices" + return _run_voice_command(["espeak-ng", arg]) + + +def _list_piper_voices(cfg: dict[str, Any]) -> list[str]: + """Return Piper model guidance instead of pretending it has a voice catalog.""" + model = cfg.get("tts_model") + if model: + return [f"Configured Piper model: {model}"] + return ["Piper voices are model files. Set tts_model to a downloaded .onnx voice model."] + + +def _list_kokoro_voices(cfg: dict[str, Any]) -> list[str]: + """Return Kokoro voice-bundle guidance from the configured files.""" + voice = cfg.get("tts_voice") + voices = cfg.get("tts_voices") + if voice or voices: + return [f"Configured Kokoro voice: {voice or ''}", f"Voice bundle: {voices or ''}"] + return ["Kokoro voices come from the configured tts_voices bundle. Set tts_voice to one voice from it."] + + +# Registry entries describe validation, dependency checks, synthesis, and +# optional voice-listing behavior in one place so new free backends can be added +# without changing the CLI or import workflows. +_TTS_BACKENDS: dict[str, TtsBackendSpec] = { + "gtts": TtsBackendSpec( + raw_ext=".mp3", + command="gtts-cli", + required_keys=("tts_code", "tts_tld"), + build=_generate_gtts, + list_voices=_list_gtts_voices, + ), + "edge-tts": TtsBackendSpec( + raw_ext=".mp3", + command="edge-tts", + required_keys=("tts_voice",), + build=_generate_edge_tts, + list_voices=_list_edge_voices, + ), + "piper": TtsBackendSpec( + raw_ext=".wav", + command="piper", + required_keys=("tts_model",), + build=_generate_piper, + list_voices=_list_piper_voices, + ), + "espeak-ng": TtsBackendSpec( + raw_ext=".wav", + command="espeak-ng", + required_keys=("tts_voice",), + build=_generate_espeak, + list_voices=_list_espeak_voices, + ), + "kokoro": TtsBackendSpec( + raw_ext=".wav", + required_keys=("tts_model", "tts_voices", "tts_voice", "tts_code"), + build=_generate_kokoro, + list_voices=_list_kokoro_voices, + ), +} + + +def supported_tts_backends() -> list[str]: + """Return supported backend names for argparse choices and error messages.""" + return sorted(_TTS_BACKENDS) + + +def prepare_tts_backend(lang_cfg: dict[str, Any]) -> PreparedTtsBackend: + """Validate config and return a callable backend for one language. + + Path-like config values are expanded before validation, required keys are + checked per backend, and external command dependencies are verified when a + backend shells out to a CLI tool. + """ + backend = str(lang_cfg.get("tts_backend", "gtts")).strip() + spec = _TTS_BACKENDS.get(backend) + if spec is None: + raise ValueError(_unknown_backend_message(backend)) + + cfg = _expand_tts_paths(lang_cfg) + _require_backend_keys(backend, cfg, spec.required_keys) + if spec.command: + require_command(spec.command) + return PreparedTtsBackend(name=backend, raw_ext=spec.raw_ext, synthesize=spec.build(cfg)) + + +def list_tts_voices(config: Config, lang: str | None = None, backend: str | None = None) -> list[str]: + """Return voice names or backend-specific hints for the selected TTS backend.""" + lang_cfg = config.language(lang) if lang else {} + if backend: + lang_cfg = {**lang_cfg, "tts_backend": backend} + name = str(lang_cfg.get("tts_backend", "gtts")).strip() + spec = _TTS_BACKENDS.get(name) + if spec is None: + raise ValueError(_unknown_backend_message(name)) + cfg = _expand_tts_paths(lang_cfg) + if spec.list_voices is None: + return [f"{name} does not support voice listing."] + return spec.list_voices(cfg) + + +def default_tts_test_text(lang: str) -> str: + """Return a short built-in phrase for ``tts-test``.""" + return _DEFAULT_TEST_TEXT.get(lang, "This is a test.") + + +def synthesize_tts_sample( + config: Config, + lang: str, + text: str | None = None, + output: str | None = None, + tts_overrides: Mapping[str, Any] | None = None, +) -> str: + """Generate one TTS sample without touching Anki. + + This is the safest way to verify backend configuration. It uses the same + backend preparation and ffmpeg normalization path as real imports. + """ + language = _language_config(config, lang, tts_overrides) + backend = prepare_tts_backend(language) + tempo = _tts_tempo(language) + require_command("ffmpeg") + + output_path = expand_path(output) if output else _default_tts_output(lang, backend.name) + output_dir = os.path.dirname(output_path) + if output_dir: + os.makedirs(output_dir, exist_ok=True) + + sentence = text or default_tts_test_text(lang) + with tempfile.TemporaryDirectory() as temp_dir: + raw_output = os.path.join(temp_dir, f"tts_test_original{backend.raw_ext}") + backend.synthesize(sentence, raw_output) + speed_audio(raw_output, output_path, tempo) + return output_path + + +def _raw_ext(backend: str) -> str: + """Return a backend's raw extension, defaulting to MP3 for unknown names.""" + spec = _TTS_BACKENDS.get(backend) + return spec.raw_ext if spec else ".mp3" + + +def _language_config( + config: Config, + lang: str, + tts_overrides: Mapping[str, Any] | None = None, +) -> dict[str, Any]: + """Return a language config with any non-None CLI overrides applied.""" + language = config.language(lang) + if tts_overrides: + language.update({key: value for key, value in tts_overrides.items() if value is not None}) + return language + + +def _default_tts_output(lang: str, backend: str) -> str: + """Return the default sample output path for ``tts-test``.""" + safe_backend = backend.replace(os.sep, "_") + return os.path.abspath(f"tts_test_{lang}_{safe_backend}.mp3") + + +def _expand_tts_paths(lang_cfg: Mapping[str, Any]) -> dict[str, Any]: + """Expand TTS paths and resolve relative model files under tts_model_dir.""" + cfg = dict(lang_cfg) + if isinstance(cfg.get("tts_model_dir"), str): + cfg["tts_model_dir"] = expand_path(str(cfg["tts_model_dir"])) + for key in _TTS_PATH_KEYS: + if isinstance(cfg.get(key), str): + path = expand_path(str(cfg[key])) + if not os.path.isabs(path) and cfg.get("tts_model_dir"): + path = os.path.join(str(cfg["tts_model_dir"]), path) + cfg[key] = path + return cfg + + +def _require_backend_keys(backend: str, cfg: Mapping[str, Any], keys: tuple[str, ...]) -> None: + """Ensure backend-specific required config keys are present and non-empty.""" + missing = [key for key in keys if cfg.get(key) is None or (isinstance(cfg.get(key), str) and not cfg[key].strip())] + if missing: + raise RuntimeError(f"{backend} backend requires config key(s): {', '.join(missing)}") + + +def _require_file(backend: str, key: str, path: str) -> None: + """Ensure a configured model path exists before calling a backend.""" + if not os.path.isfile(path): + raise RuntimeError(f"{backend} backend config key {key} points to a missing file: {path}") + + +def format_tts_error(exc: Exception) -> str: + """Format backend and ffmpeg failures for concise CLI output.""" + return _error_message(exc) + + +def _unknown_backend_message(backend: str) -> str: + """Build a consistent unknown-backend error message.""" + return f"Unknown TTS backend: {backend!r}. Choose from: {', '.join(supported_tts_backends())}" + + +def _optional_float(cfg: Mapping[str, Any], key: str, default: float | None) -> float | None: + """Parse an optional numeric config value.""" + if cfg.get(key) is None: + return default + try: + return float(cfg[key]) + except (TypeError, ValueError) as exc: + raise RuntimeError(f"{key} must be a number.") from exc + + +def _run_voice_command(command: list[str], timeout: float = 30.0) -> list[str]: + """Run an external voice-listing command and return printable lines.""" + if shutil.which(command[0]) is None: + return [f"Required command not found: {command[0]}"] + try: + result = subprocess.run( + command, + text=True, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + timeout=timeout, + check=True, + ) + except subprocess.CalledProcessError as exc: + return [f"{' '.join(command)} failed: {_subprocess_detail(exc)}"] + except subprocess.TimeoutExpired: + return [f"{' '.join(command)} timed out after {timeout:g}s."] + lines = [line for line in result.stdout.splitlines() if line.strip()] + return lines or ["No voices returned."] + + +def _short_text(value: str, limit: int = 500) -> str: + """Collapse and truncate long subprocess output for display.""" + text = " ".join(value.strip().split()) + if len(text) <= limit: + return text + return f"{text[:limit - 3]}..." + + +def _sentence_label(sentence: str, limit: int = 80) -> str: + """Return a compact representation of a sentence for error lists.""" + text = " ".join(sentence.split()) + if len(text) > limit: + text = f"{text[:limit - 3]}..." + return repr(text) + + +def _subprocess_detail(exc: subprocess.CalledProcessError) -> str: + """Extract useful stdout/stderr context from a failed subprocess.""" + stderr = exc.stderr + stdout = exc.stdout + detail = stderr if stderr else stdout + if isinstance(detail, bytes): + detail = detail.decode("utf-8", errors="replace") + text = _short_text(str(detail or "")) + command = exc.cmd if isinstance(exc.cmd, str) else " ".join(str(part) for part in exc.cmd) + suffix = f": {text}" if text else "" + return f"{command} exited with status {exc.returncode}{suffix}" + + +def _error_message(exc: Exception) -> str: + """Convert an exception into a short user-facing string.""" + if isinstance(exc, subprocess.CalledProcessError): + return _subprocess_detail(exc) + return _short_text(str(exc) or exc.__class__.__name__) + + +def _tts_tempo(cfg: Mapping[str, Any]) -> float: + """Validate and return the post-processing tempo multiplier.""" + tempo = _optional_float(cfg, "tts_tempo", 1.0) + if tempo is None or tempo <= 0: + raise RuntimeError("tts_tempo must be greater than 0.") + return tempo def speed_audio(raw_output: str, output_path: str, tempo: float) -> None: + """Convert backend output to an MP3 and apply ffmpeg's atempo filter.""" subprocess.run( ["ffmpeg", "-loglevel", "error", "-i", raw_output, "-filter:a", f"atempo={tempo}", "-y", output_path], stdin=subprocess.DEVNULL, @@ -49,6 +470,11 @@ def speed_audio(raw_output: str, output_path: str, tempo: float) -> None: def read_sentences(path: str) -> list[str]: + """Read sentences from plain text, CSV, or TSV input. + + CSV and TSV imports must contain a ``sentence`` header so exports from the + YouTube sentence-mining command can be imported directly. + """ expanded = os.path.expanduser(path) if expanded.lower().endswith((".tsv", ".csv")): delimiter = "\t" if expanded.lower().endswith(".tsv") else "," @@ -68,11 +494,19 @@ def import_sentences( sentence_file: str | None = None, tags_value: str | None = None, request: Callable = anki_request, + tts_overrides: Mapping[str, Any] | None = None, ) -> ImportResult: - require_command("gtts-cli") + """Generate TTS for each sentence and add cards through AnkiConnect. + + The first configured deck for the language is used as the destination. + Audio is attached to the front field so Anki imports the temporary MP3 into + its media collection before the temporary directory is removed. + """ + language = _language_config(config, lang, tts_overrides) + backend = prepare_tts_backend(language) + tempo = _tts_tempo(language) require_command("ffmpeg") - language = config.language(lang) decks = list(language.get("decks", [])) if not decks: raise RuntimeError(f"No deck configured for language: {lang}") @@ -85,15 +519,16 @@ def import_sentences( back_field = config.fields.get("back", "Back") added = 0 failed = 0 + errors: list[str] = [] with tempfile.TemporaryDirectory() as temp_dir: for sentence in sentences: basename = f"tts_{time.strftime('%Y%m%d_%H%M%S')}_{lang}_{os.getpid()}_{added + failed}" - raw_output = os.path.join(temp_dir, f"{basename}_original.mp3") + raw_output = os.path.join(temp_dir, f"{basename}_original{backend.raw_ext}") output_path = os.path.join(temp_dir, f"{basename}.mp3") try: - generate_tts(sentence, raw_output, str(language["tts_code"]), str(language["tts_tld"])) - speed_audio(raw_output, output_path, float(language["tts_tempo"])) + backend.synthesize(sentence, raw_output) + speed_audio(raw_output, output_path, tempo) request( "addNote", url=config.anki_connect_url, @@ -107,6 +542,8 @@ def import_sentences( }, ) added += 1 - except Exception: + except Exception as exc: failed += 1 - return ImportResult(processed=len(sentences), added=added, failed=failed) + if len(errors) < _MAX_ERROR_DETAILS: + errors.append(f"{_sentence_label(sentence)}: {_error_message(exc)}") + return ImportResult(processed=len(sentences), added=added, failed=failed, errors=errors) diff --git a/src/saiki/text.py b/src/saiki/text.py index f78e275..1f88de6 100644 --- a/src/saiki/text.py +++ b/src/saiki/text.py @@ -8,6 +8,11 @@ import regex as re def extract_first_visible_line(text: str) -> str: + """Strip simple HTML and return the first visible line of text. + + This is useful for card fields where the first line is the sentence and + subsequent lines may contain notes, hints, or generated markup. + """ text = unescape(text or "") text = re.sub(r"]*>", "\n", text, flags=re.IGNORECASE) text = re.sub(r"<[^>]+>", "", text) @@ -16,6 +21,7 @@ def extract_first_visible_line(text: str) -> str: def extract_visible_text(text: str) -> str: + """Strip simple HTML while preserving paragraph-like line breaks.""" text = unescape(text or "") text = re.sub(r"]*>", "\n", text, flags=re.IGNORECASE) text = re.sub(r"<[^>]+>", "", text) @@ -25,5 +31,5 @@ def extract_visible_text(text: str) -> str: def normalize_word_key(value: str) -> str: + """Normalize a word for set membership and comparison.""" return re.sub(r"\s+", " ", value.strip().lower()) - diff --git a/src/saiki/words.py b/src/saiki/words.py index 3ac8088..5c7ff3d 100644 --- a/src/saiki/words.py +++ b/src/saiki/words.py @@ -26,15 +26,22 @@ JAPANESE_ALLOWED_POS = {"NOUN", "PROPN", "VERB", "ADJ"} def setup_logging(logfile: str) -> None: + """Configure file logging for word extraction scripts.""" os.makedirs(os.path.dirname(os.path.abspath(logfile)), exist_ok=True) logging.basicConfig(filename=logfile, level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") def build_query_from_decks(decks: list[str]) -> str: + """Build an Anki search query that matches any configured deck.""" return " OR ".join(f'deck:"{d}"' for d in decks) def japanese_filter(token) -> bool: + """Return whether a spaCy token is useful Japanese vocabulary. + + The filter is intentionally conservative: it keeps content words and drops + common particles, helper grammar, stop words, URLs, and obvious HTML debris. + """ text = (token.text or "").strip() lemma = (token.lemma_ or "").strip() if not text or not JAPANESE_CHAR_RE.fullmatch(text): @@ -51,14 +58,17 @@ def japanese_filter(token) -> bool: def spanish_filter(token) -> bool: + """Return whether a spaCy token is useful Spanish vocabulary.""" return bool(getattr(token, "is_alpha", False)) and not bool(getattr(token, "is_stop", False)) def spanish_format(token) -> str: + """Normalize a Spanish token to its lowercase lemma.""" return (token.lemma_ or token.text or "").lower().strip() def japanese_format(token) -> str: + """Format a Japanese token as lemma plus surface form when they differ.""" lemma = (token.lemma_ or "").strip() surface = (token.text or "").strip() if lemma and surface and lemma != surface: @@ -73,6 +83,7 @@ LANGUAGE_PROFILES = { def load_spacy_model(model_name: str): + """Load a spaCy model with installation-oriented error messages.""" try: import spacy # type: ignore except Exception as e: @@ -84,6 +95,7 @@ def load_spacy_model(model_name: str): def get_notes(query: str, config: Config, request: Callable = anki_request) -> list[dict]: + """Fetch Anki note details matching a search query.""" note_ids = request("findNotes", url=config.anki_connect_url, query=query) or [] if not note_ids: return [] @@ -98,6 +110,7 @@ def extract_counts( output_format: Callable, use_full_field: bool, ) -> Counter: + """Count formatted vocabulary items across Anki notes.""" counter: Counter = Counter() for note in notes: fields = note.get("fields", {}) or {} @@ -114,6 +127,7 @@ def extract_counts( def write_counts(counter: Counter, out_path: str, min_freq: int) -> int: + """Write a sorted ``word frequency`` list and return the number of rows.""" items = [(w, c) for (w, c) in counter.items() if c >= min_freq] items.sort(key=lambda x: (-x[1], x[0])) os.makedirs(os.path.dirname(os.path.abspath(out_path)), exist_ok=True) @@ -124,6 +138,7 @@ def write_counts(counter: Counter, out_path: str, min_freq: int) -> int: def read_word_file(path: str) -> set[str]: + """Read a ``word frequency`` file into normalized word keys.""" words: set[str] = set() with open(os.path.expanduser(path), "r", encoding="utf-8") as fh: for line in fh: @@ -136,6 +151,7 @@ def read_word_file(path: str) -> set[str]: def compare_word_files(source_path: str, known_path: str) -> list[str]: + """Return source rows whose normalized word is not in the known list.""" known = read_word_file(known_path) new_words: list[str] = [] with open(os.path.expanduser(source_path), "r", encoding="utf-8") as fh: @@ -162,6 +178,12 @@ def extract_words( spacy_model: str | None = None, request: Callable = anki_request, ) -> dict[str, object]: + """Extract frequent vocabulary from configured Anki cards. + + The function accepts explicit query/deck/field overrides for CLI use, but + defaults to the selected language config. Its dictionary return value keeps + the CLI output simple and gives tests stable fields to assert against. + """ language_bucket = config.language_name(lang) profile = LANGUAGE_PROFILES[language_bucket] search_query = query or build_query_from_decks(decks or config.decks_for(lang)) @@ -180,4 +202,3 @@ def extract_words( counter = extract_counts(notes, field_name, nlp, profile["token_filter"], profile["output_format"], full_field) written = write_counts(counter, out_path, min_freq) return {"query": search_query, "notes": len(notes), "unique": len(counter), "written": written, "out": out_path} - diff --git a/src/saiki/youtube.py b/src/saiki/youtube.py index e140c76..d80a075 100644 --- a/src/saiki/youtube.py +++ b/src/saiki/youtube.py @@ -27,11 +27,14 @@ STOPWORDS = { @dataclass(frozen=True) class TranscriptLine: + """One cleaned transcript line with its start timestamp in seconds.""" + start: float text: str def extract_video_id(url_or_id: str) -> str: + """Extract a YouTube video id from a URL, or pass an id through unchanged.""" if "youtube" in url_or_id or "youtu.be" in url_or_id: query = urlparse(url_or_id) if query.hostname == "youtu.be": @@ -44,11 +47,13 @@ def extract_video_id(url_or_id: str) -> str: def video_url(video_or_id: str) -> str: + """Return a canonical watch URL for a YouTube id or URL.""" video_id = extract_video_id(video_or_id) return f"https://www.youtube.com/watch?v={video_id}" def fetch_transcript(video_id: str, lang_code: str): + """Fetch a transcript while supporting old and new library APIs.""" if hasattr(YouTubeTranscriptApi, "fetch"): api = YouTubeTranscriptApi() return api.fetch(video_id, languages=[lang_code]) @@ -58,18 +63,21 @@ def fetch_transcript(video_id: str, lang_code: str): def snippet_text(entry) -> str: + """Read transcript text from either dict-like or object-like entries.""" if isinstance(entry, dict): return entry.get("text", "") or "" return getattr(entry, "text", "") or "" def snippet_start(entry) -> float: + """Read transcript start time from either dict-like or object-like entries.""" if isinstance(entry, dict): return float(entry.get("start", 0.0) or 0.0) return float(getattr(entry, "start", 0.0) or 0.0) def transcript_lines(entries) -> list[TranscriptLine]: + """Normalize raw transcript entries into non-empty transcript lines.""" lines: list[TranscriptLine] = [] for entry in entries: text = snippet_text(entry).replace("\n", " ").strip() @@ -79,6 +87,7 @@ def transcript_lines(entries) -> list[TranscriptLine]: def tokenize_japanese(text: str) -> list[str]: + """Tokenize Japanese text with fugashi.""" try: from fugashi import Tagger except ImportError as e: @@ -88,15 +97,18 @@ def tokenize_japanese(text: str) -> list[str]: def tokenize_spanish(text: str, raw: bool = False) -> list[str]: + """Tokenize Spanish-ish text with a lightweight word regex.""" tokens = re.findall(r"\b[\wáéíóúñü]+\b", text) return tokens if raw else [t.lower() for t in tokens] def tokenize_text(text: str, lang_code: str, raw: bool = False) -> list[str]: + """Dispatch transcript tokenization by language code.""" return tokenize_japanese(text) if lang_code == "ja" else tokenize_spanish(text, raw=raw) def count_words(tokens: list[str], lang_code: str, remove_stopwords: bool = True) -> Counter: + """Count tokens, optionally excluding the built-in stopword list.""" if remove_stopwords: stopwords = STOPWORDS.get(lang_code, set()) tokens = [t for t in tokens if t not in stopwords] @@ -104,6 +116,7 @@ def count_words(tokens: list[str], lang_code: str, remove_stopwords: bool = True def sentence_vocab(sentence: str, lang_code: str, known_words: set[str] | None = None) -> list[str]: + """Guess distinct useful vocabulary for one transcript sentence.""" words: list[str] = [] seen: set[str] = set() for token in tokenize_text(sentence, lang_code): @@ -126,6 +139,7 @@ def write_sentence_export( known_words_path: str | None = None, only_new: bool = False, ) -> int: + """Write transcript lines as Anki-importable sentence rows.""" known = read_word_file(known_words_path) if known_words_path else None os.makedirs(os.path.dirname(os.path.abspath(out_path)), exist_ok=True) written = 0 @@ -154,6 +168,7 @@ def run_youtube( known_words: str | None = None, only_new: bool = False, ) -> dict[str, object]: + """Run transcript mining in either vocabulary or sentence-export mode.""" lang_code = config.transcript_code(lang) video_id = extract_video_id(video) entries = fetch_transcript(video_id, lang_code) @@ -176,4 +191,3 @@ def run_youtube( for word, count in items: fh.write(f"{word} {count}\n") return {"mode": mode, "items": items, "out": out} - diff --git a/tests/test_core.py b/tests/test_core.py index 81bf4ff..001c648 100644 --- a/tests/test_core.py +++ b/tests/test_core.py @@ -4,15 +4,26 @@ import os import sys import tempfile import unittest +from copy import deepcopy from pathlib import Path +from unittest.mock import patch SRC_DIR = Path(__file__).resolve().parents[1] / "src" if str(SRC_DIR) not in sys.path: sys.path.insert(0, str(SRC_DIR)) from saiki.audio import build_playlist, resolve_media_paths -from saiki.config import DEFAULT_CONFIG, deep_merge -from saiki.importer import parse_tags, read_sentences +from saiki.config import Config, DEFAULT_CONFIG, deep_merge +from saiki.importer import ( + PreparedTtsBackend, + import_sentences, + list_tts_voices, + parse_tags, + prepare_tts_backend, + read_sentences, + synthesize_tts_sample, + supported_tts_backends, +) from saiki.text import extract_first_visible_line, extract_visible_text from saiki.words import build_query_from_decks, compare_word_files, read_word_file from saiki.youtube import TranscriptLine, extract_video_id, sentence_vocab, write_sentence_export @@ -23,6 +34,7 @@ class ConfigTests(unittest.TestCase): merged = deep_merge(DEFAULT_CONFIG, {"languages": {"es": {"decks": ["Spanish"]}}}) self.assertEqual(merged["languages"]["es"]["decks"], ["Spanish"]) self.assertEqual(merged["languages"]["es"]["transcript_code"], "es") + self.assertEqual(merged["tts_model_dir"], "~/.local/share/saiki/models") self.assertIn("jp", merged["languages"]) @@ -107,6 +119,87 @@ class ImporterTests(unittest.TestCase): fh.write("Hola mundo\t1.00\thttps://example.test\tmundo\n") self.assertEqual(read_sentences(path), ["Hola mundo"]) + def test_supported_tts_backends_are_free_options(self): + self.assertEqual(supported_tts_backends(), ["edge-tts", "espeak-ng", "gtts", "kokoro", "piper"]) + + def test_list_gtts_voice_hint(self): + config = Config(deep_merge(deepcopy(DEFAULT_CONFIG), {"languages": {"es": {"tts_code": "es", "tts_tld": "es"}}})) + voices = list_tts_voices(config, "es", backend="gtts") + self.assertIn("gtts does not expose named voices.", voices) + self.assertIn("tts_code=es", voices[1]) + + def test_list_edge_voice_uses_configured_voice(self): + voices = list_tts_voices(Config(deepcopy(DEFAULT_CONFIG)), "es") + self.assertEqual(voices[0], "Configured edge-tts voice: es-ES-ElviraNeural") + + def test_prepare_tts_backend_validates_required_keys(self): + with self.assertRaisesRegex(RuntimeError, "tts_voice"): + prepare_tts_backend({"tts_backend": "edge-tts"}) + + def test_prepare_tts_backend_expands_model_paths(self): + with tempfile.TemporaryDirectory() as tmp: + model = os.path.join(tmp, "voice.onnx") + config = os.path.join(tmp, "voice.onnx.json") + for path in [model, config]: + with open(path, "w", encoding="utf-8") as fh: + fh.write("{}") + + with patch("saiki.importer.require_command"): + backend = prepare_tts_backend( + { + "tts_backend": "piper", + "tts_model_dir": tmp, + "tts_model": "voice.onnx", + "tts_config": "voice.onnx.json", + } + ) + + with patch("saiki.importer.subprocess.run") as run: + backend.synthesize("Hola", "/tmp/out.wav") + + args = run.call_args.args[0] + self.assertEqual(args[2], model) + self.assertEqual(args[4], config) + self.assertEqual(run.call_args.kwargs["input"], b"Hola\n") + + def test_synthesize_tts_sample_uses_backend_and_speed_audio(self): + seen: dict[str, str] = {} + + def synthesize(sentence: str, output: str) -> None: + seen["sentence"] = sentence + seen["raw_output"] = output + + with tempfile.TemporaryDirectory() as tmp: + output = os.path.join(tmp, "sample.mp3") + with patch("saiki.importer.prepare_tts_backend") as prepare, patch( + "saiki.importer.require_command" + ), patch("saiki.importer.speed_audio") as speed: + prepare.return_value = PreparedTtsBackend("fake", ".wav", synthesize) + result = synthesize_tts_sample(Config(deepcopy(DEFAULT_CONFIG)), "es", output=output) + + self.assertEqual(result, output) + self.assertEqual(seen["sentence"], "Esta es una prueba.") + self.assertTrue(seen["raw_output"].endswith(".wav")) + speed.assert_called_once() + + def test_import_sentences_returns_error_details(self): + def fail_synthesis(sentence: str, output: str) -> None: + raise RuntimeError("tts broke") + + with tempfile.TemporaryDirectory() as tmp: + path = os.path.join(tmp, "sentences.txt") + with open(path, "w", encoding="utf-8") as fh: + fh.write("Hola mundo\n") + + with patch("saiki.importer.prepare_tts_backend") as prepare, patch("saiki.importer.require_command"): + prepare.return_value = PreparedTtsBackend("fake", ".mp3", fail_synthesis) + result = import_sentences(Config(deepcopy(DEFAULT_CONFIG)), "es", path, request=lambda *a, **k: None) + + self.assertEqual(result.processed, 1) + self.assertEqual(result.added, 0) + self.assertEqual(result.failed, 1) + self.assertEqual(result.errors, ["'Hola mundo': tts broke"]) + if __name__ == "__main__": unittest.main()