Updated docs + added TTS backends

This commit is contained in:
2026-06-03 14:01:18 -04:00
parent 226fecbe71
commit c923f90a75
14 changed files with 916 additions and 36 deletions

113
README.md
View File

@@ -18,6 +18,7 @@ The name is a coined Japanese compound from `採` as in gathering/collecting and
- [Anki](https://apps.ankiweb.net/) with [AnkiConnect](https://github.com/amikey/anki-connect)
- `ffmpeg`
- Python dependencies from `requirements.txt`
- Optional extra TTS backend tools: `piper`, `espeak-ng`, and `kokoro-onnx`.
- spaCy models for word mining:
```shell
@@ -35,6 +36,62 @@ pip install -r requirements.txt
sudo dnf install ffmpeg
```
### Optional TTS Backends
The default `edge-tts` backend is installed by `requirements.txt`. Install only
the optional pieces you plan to test:
```shell
# Python-backed optional engines: piper, kokoro.
pip install -r requirements-tts.txt
# System package for espeak-ng.
sudo dnf install espeak-ng
```
Other package-manager names:
```shell
sudo apt-get install espeak-ng
sudo pacman -S espeak-ng
```
Backend notes:
- `edge-tts`: installed by `pip install edge-tts`; no API key, but it uses
Microsoft Edge's online TTS service.
- `gtts`: installed by `requirements.txt`; no API key, but it uses Google's
online TTS service through `gtts-cli`.
- `piper`: installed by `pip install piper-tts`; you still need a compatible
`.onnx` voice model, usually with its matching `.onnx.json` config file.
- `espeak-ng`: installed through your OS package manager, not pip.
- `kokoro`: installed by `pip install kokoro-onnx soundfile`; you still need
`kokoro-v1.0.onnx` and `voices-v1.0.bin`, plus any language-specific G2P
setup required by your Kokoro release.
Example model downloads for the README smoke tests:
```shell
mkdir -p ~/.local/share/saiki/models
# Piper Spanish voice model plus matching config.
wget -O ~/.local/share/saiki/models/es_ES-davefx-medium.onnx \
https://huggingface.co/rhasspy/piper-voices/resolve/main/es/es_ES/davefx/medium/es_ES-davefx-medium.onnx
wget -O ~/.local/share/saiki/models/es_ES-davefx-medium.onnx.json \
https://huggingface.co/rhasspy/piper-voices/resolve/main/es/es_ES/davefx/medium/es_ES-davefx-medium.onnx.json
# Kokoro ONNX model plus voices bundle.
wget -O ~/.local/share/saiki/models/kokoro-v1.0.onnx \
https://github.com/thewh1teagle/kokoro-onnx/releases/download/model-files-v1.0/kokoro-v1.0.onnx
wget -O ~/.local/share/saiki/models/voices-v1.0.bin \
https://github.com/thewh1teagle/kokoro-onnx/releases/download/model-files-v1.0/voices-v1.0.bin
```
Saiki's default `tts_model_dir` is `~/.local/share/saiki/models`. Relative
model paths such as `es_ES-davefx-medium.onnx` are resolved under that
directory. You can override it in YAML with `tts_model_dir` or for one command
with `--tts-model-dir`.
## Configuration
Defaults are built in, but you can override them with YAML:
@@ -57,6 +114,7 @@ media_dir: ~/.var/app/net.ankiweb.Anki/data/Anki2/User 1/collection.media
audio_output_root: ~/Languages/Anki/anki-audio
word_output_root: ~/Languages/Anki/anki-words
sentence_dir: ~/Languages/Anki
tts_model_dir: ~/.local/share/saiki/models
note_model: Basic
fields:
front: Front
@@ -65,8 +123,8 @@ languages:
jp:
name: japanese
transcript_code: ja
tts_code: ja
tts_tld: com
tts_backend: edge-tts
tts_voice: ja-JP-NanamiNeural
tts_tempo: 1.35
decks: ["日本語"]
field: Back
@@ -75,8 +133,8 @@ languages:
es:
name: spanish
transcript_code: es
tts_code: es
tts_tld: es
tts_backend: edge-tts
tts_voice: es-ES-ElviraNeural
tts_tempo: 1.25
decks: ["Español"]
field: Back
@@ -174,12 +232,59 @@ Generate TTS audio and add sentence cards to Anki.
./saiki.py import es
./saiki.py import jp ~/Languages/Anki/sentences_jp.txt
./saiki.py import es youtube.tsv --tags youtube,manual
./saiki.py import es --tts-voice es-MX-DaliaNeural
```
The importer accepts plain text sentence files and TSV/CSV files with a
`sentence` column. `text-to-speech` is always added as a tag. If `--tags` is not
provided, `AI-generated` is added.
TTS is configured per language with `tts_backend`. Supported backends are:
- `edge-tts`: default backend using Microsoft Edge neural voices; configure
`tts_voice`.
- `gtts`: free backend using `gtts-cli`; configure `tts_code` and
`tts_tld`.
- `piper`: local/offline neural TTS; configure `tts_model` with a model path.
The stock Piper catalog includes Spanish voices, but not Japanese.
- `espeak-ng`: local/offline lightweight TTS; configure `tts_voice`. Spanish is
supported; Japanese is documented as kana-only and is not recommended for
normal Japanese sentence cards.
- `kokoro`: local/offline neural TTS; configure `tts_model`, `tts_voices`,
`tts_voice`, and `tts_code`; some Japanese setups also need
`tts_vocab_config`. Kokoro lists Japanese and Spanish voices, but upstream
notes that non-English quality can be thin.
You can override backend settings for one import:
```shell
./saiki.py import jp sentences_jp.txt \
--tts-backend edge-tts \
--tts-voice ja-JP-KeitaNeural
```
Voice-listing helpers:
```shell
./saiki.py tts-voices jp
./saiki.py tts-voices es --backend edge-tts
```
Test a TTS backend without creating Anki cards:
```shell
./saiki.py tts-test es --out /tmp/saiki_edge_default_es.mp3
./saiki.py tts-test jp --tts-backend edge-tts --tts-voice ja-JP-NanamiNeural --out /tmp/saiki_edge_jp.mp3
./saiki.py tts-test es --tts-backend edge-tts --tts-voice es-ES-ElviraNeural --out /tmp/saiki_edge_es.mp3
./saiki.py tts-test es --tts-backend gtts --tts-code es --tts-tld es --out /tmp/saiki_gtts_es.mp3
./saiki.py tts-test es --tts-backend piper --tts-model es_ES-davefx-medium.onnx --tts-config es_ES-davefx-medium.onnx.json --out /tmp/saiki_piper_es.mp3
./saiki.py tts-test es --tts-backend espeak-ng --tts-voice es --out /tmp/saiki_espeak_es.mp3
./saiki.py tts-test es --tts-backend kokoro --tts-model kokoro-v1.0.onnx --tts-voices voices-v1.0.bin --tts-voice ef_dora --out /tmp/saiki_kokoro_es.mp3
```
For `kokoro`, put `tts_model`, `tts_voices`, and any needed `tts_vocab_config`
in your config file rather than typing every path each time.
### Known/New Words
Compare any generated word list against an existing known list:

View File

@@ -11,6 +11,7 @@ media_dir: ~/.var/app/net.ankiweb.Anki/data/Anki2/User 1/collection.media
audio_output_root: ~/Languages/Anki/anki-audio
word_output_root: ~/Languages/Anki/anki-words
sentence_dir: ~/Languages/Anki
tts_model_dir: ~/.local/share/saiki/models
note_model: Basic
fields:
@@ -21,8 +22,6 @@ languages:
jp:
name: japanese
transcript_code: ja
tts_code: ja
tts_tld: com
tts_tempo: 1.35
decks:
- 日本語
@@ -30,14 +29,69 @@ languages:
word_model: ja_core_news_lg
sentence_file: sentences_jp.txt
# --- TTS backend (pick one) ---
# edge-tts (default): Microsoft Edge neural TTS. Requires: pip install edge-tts
tts_backend: edge-tts
tts_voice: ja-JP-NanamiNeural # or ja-JP-KeitaNeural for male
# gtts: free Google TTS via gtts-cli. Requires: pip install gtts
# tts_backend: gtts
# tts_code: ja
# tts_tld: com
# piper: no stock Japanese voice is listed in Piper's official voice catalog.
# Only use this backend for JP if you have your own compatible Japanese model.
# espeak-ng: Japanese support is kana-only in the upstream docs, so this is
# not a good fit for normal Japanese sentences that include kanji.
# kokoro: offline neural TTS. Requires: pip install kokoro-onnx soundfile
# plus the Kokoro model/voice files and Japanese G2P dependencies for your
# installed Kokoro release. Kokoro lists Japanese voices, but its docs warn
# that non-English quality can be thin.
# tts_backend: kokoro
# tts_model: kokoro-v1.0.onnx
# tts_voices: voices-v1.0.bin
# tts_vocab_config: kokoro-ja-config.json
# tts_voice: jf_alpha # Japanese female; jm_kumo for male
# tts_code: ja
es:
name: spanish
transcript_code: es
tts_code: es
tts_tld: es
tts_tempo: 1.25
decks:
- Español
field: Back
word_model: es_core_news_sm
sentence_file: sentences_es.txt
# --- TTS backend (pick one) ---
# edge-tts (default): Microsoft Edge neural TTS. Requires: pip install edge-tts
tts_backend: edge-tts
tts_voice: es-ES-ElviraNeural # or es-MX-DaliaNeural for Mexican Spanish
# gtts: free Google TTS via gtts-cli. Requires: pip install gtts
# tts_backend: gtts
# tts_code: es
# tts_tld: es
# piper: offline neural TTS. Requires: piper binary + model download
# tts_backend: piper
# tts_model: es_ES-davefx-medium.onnx
# tts_config: es_ES-davefx-medium.onnx.json
# espeak-ng: offline, lightweight, robotic quality. Requires: espeak-ng package
# tts_backend: espeak-ng
# tts_voice: es
# kokoro: offline neural TTS. Requires: pip install kokoro-onnx soundfile
# plus the Kokoro model/voice files. Kokoro lists Spanish voices, but its
# docs warn that non-English quality can be thin.
# tts_backend: kokoro
# tts_model: kokoro-v1.0.onnx
# tts_voices: voices-v1.0.bin
# tts_voice: ef_dora # Spanish female
# tts_code: es

5
requirements-tts.txt Normal file
View File

@@ -0,0 +1,5 @@
# Optional free TTS backends.
# Install this only if you want to test/use non-default TTS engines.
piper-tts
kokoro-onnx
soundfile

View File

@@ -4,5 +4,6 @@ spacy
youtube-transcript-api
fugashi[unidic-lite]
gTTS
edge-tts
pyyaml
genanki

View File

@@ -6,6 +6,8 @@ from pathlib import Path
SRC_DIR = Path(__file__).resolve().parent / "src"
if str(SRC_DIR) not in sys.path:
# Let the repository checkout run directly without requiring an editable
# install first. Installed packages will still resolve normally.
sys.path.insert(0, str(SRC_DIR))
from saiki.cli import main

View File

@@ -6,6 +6,13 @@ import requests
def anki_request(action: str, url: str = "http://localhost:8765", **params):
"""Send one JSON-RPC style request to AnkiConnect.
AnkiConnect exposes all operations as an ``action`` plus a ``params``
object. This helper centralizes the protocol version, timeout, HTTP error
handling, and conversion of AnkiConnect's ``error`` field into a Python
exception.
"""
resp = requests.post(
url,
json={"action": action, "version": 6, "params": params},
@@ -16,4 +23,3 @@ def anki_request(action: str, url: str = "http://localhost:8765", **params):
if data.get("error") is not None:
raise RuntimeError(f"AnkiConnect error for {action}: {data['error']}")
return data["result"]

View File

@@ -16,6 +16,13 @@ AUDIO_EXTS = (".mp3", ".wav", ".ogg", ".m4a", ".flac")
def resolve_media_paths(media_dir: str, out_dir: str, media_name: str) -> tuple[str, str] | None:
"""Return safe source and destination paths for one Anki media filename.
Anki stores audio references as media names, not arbitrary filesystem
paths. Absolute paths and parent-directory traversal are rejected so a
malformed card cannot make the export read or write outside the configured
media/output directories.
"""
normalized = os.path.normpath(media_name)
if os.path.isabs(normalized) or normalized.startswith(".."):
return None
@@ -23,6 +30,7 @@ def resolve_media_paths(media_dir: str, out_dir: str, media_name: str) -> tuple[
def build_playlist(out_dir: str, language: str) -> str:
"""Write an M3U playlist containing exported audio files for a language."""
m3u_path = os.path.join(out_dir, f"{language}.m3u")
concat_name = f"{language}_concat.mp3"
files: list[str] = []
@@ -42,6 +50,7 @@ def build_playlist(out_dir: str, language: str) -> str:
def concat_audio_from_m3u(out_dir: str, m3u_path: str, out_path: str) -> None:
"""Concatenate playlist entries into a single MP3 with ffmpeg."""
if shutil.which("ffmpeg") is None:
raise RuntimeError("ffmpeg not found in PATH. Install ffmpeg to use --concat.")
@@ -59,6 +68,8 @@ def concat_audio_from_m3u(out_dir: str, m3u_path: str, out_path: str) -> None:
with tempfile.NamedTemporaryFile("w", delete=False, encoding="utf-8") as tmp:
concat_list_path = tmp.name
for path in abs_files:
# ffmpeg's concat demuxer uses single-quoted paths. Escape literal
# apostrophes so media filenames from Anki remain valid entries.
tmp.write(f"file '{path.replace(chr(39), chr(39) + chr(92) + chr(39) + chr(39))}'\n")
cmd = [
@@ -83,6 +94,13 @@ def extract_audio(
concat: bool = False,
request: Callable = anki_request,
) -> dict[str, object]:
"""Copy audio from configured Anki decks and build a playlist.
The return value is intentionally CLI-friendly: it reports the number of
copied files, the playlist path, the output directory, and the optional
concatenated MP3 path. ``request`` is injectable so tests can exercise the
workflow without a running Anki instance.
"""
language = config.language_name(lang)
selected_decks = config.decks_for(lang)
if not selected_decks:
@@ -123,4 +141,3 @@ def extract_audio(
concat_path = os.path.join(out_dir, f"{language}_concat.mp3")
concat_audio_from_m3u(out_dir, m3u_path, concat_path)
return {"copied": len(copied), "playlist": m3u_path, "outdir": out_dir, "concat": concat_path}

View File

@@ -7,17 +7,67 @@ import sys
from .audio import extract_audio
from .config import Config, language_choices, load_config
from .importer import import_sentences
from .importer import (
format_tts_error,
import_sentences,
list_tts_voices,
supported_tts_backends,
synthesize_tts_sample,
)
from .words import compare_word_files, extract_words
from .youtube import run_youtube
def add_config_arg(parser: argparse.ArgumentParser) -> None:
"""Attach the shared ``--config`` option to a parser."""
parser.add_argument("--config", help="Path to YAML config file.")
def add_tts_override_args(parser: argparse.ArgumentParser, tts_backends: list[str]) -> None:
"""Attach per-command TTS override flags.
These options intentionally mirror config keys so command-line overrides
can be collected mechanically and merged over the selected language.
"""
parser.add_argument("--tts-backend", choices=tts_backends, help="Override the configured TTS backend.")
parser.add_argument("--tts-voice", help="Override the configured backend voice.")
parser.add_argument("--tts-voices", help="Override the configured backend voice bundle path.")
parser.add_argument("--tts-model", help="Override the configured backend model or local model path.")
parser.add_argument("--tts-model-dir", help="Override the directory used for relative TTS model paths.")
parser.add_argument("--tts-config", help="Override the configured backend model config path.")
parser.add_argument("--tts-vocab-config", help="Override the configured backend vocab config path.")
parser.add_argument("--tts-code", help="Override the configured backend language code.")
parser.add_argument("--tts-tld", help="Override the configured gTTS top-level domain.")
parser.add_argument("--tts-tempo", type=float, help="Override the post-processing tempo multiplier.")
parser.add_argument("--tts-speed", type=float, help="Override backend-native speech speed when supported.")
def collect_tts_overrides(args: argparse.Namespace) -> dict[str, object]:
"""Collect TTS override attributes from an argparse namespace."""
return {
"tts_backend": getattr(args, "tts_backend", None),
"tts_voice": getattr(args, "tts_voice", None),
"tts_voices": getattr(args, "tts_voices", None),
"tts_model": getattr(args, "tts_model", None),
"tts_model_dir": getattr(args, "tts_model_dir", None),
"tts_config": getattr(args, "tts_config", None),
"tts_vocab_config": getattr(args, "tts_vocab_config", None),
"tts_code": getattr(args, "tts_code", None),
"tts_tld": getattr(args, "tts_tld", None),
"tts_tempo": getattr(args, "tts_tempo", None),
"tts_speed": getattr(args, "tts_speed", None),
}
def build_parser(config: Config | None = None) -> argparse.ArgumentParser:
"""Build the full CLI parser.
Passing a loaded config lets argparse choices reflect user-defined language
codes. When no config is supplied, defaults are loaded so the parser remains
usable in tests and help-generation contexts.
"""
choices = language_choices(config or load_config())
tts_backends = supported_tts_backends()
parser = argparse.ArgumentParser(description="Saiki: sentence mining and listening tools for Anki.")
add_config_arg(parser)
sub = parser.add_subparsers(dest="command", required=True)
@@ -61,11 +111,25 @@ def build_parser(config: Config | None = None) -> argparse.ArgumentParser:
importer.add_argument("lang", choices=choices)
importer.add_argument("sentence_file", nargs="?")
importer.add_argument("--tags", help="Comma-separated tags. text-to-speech is always included.")
add_tts_override_args(importer, tts_backends)
test_tts = sub.add_parser("tts-test", help="Synthesize one TTS sample without importing into Anki.")
test_tts.add_argument("lang", choices=choices)
test_tts.add_argument("text", nargs="?")
test_tts.add_argument("--out", help="Output MP3 path. Defaults to ./tts_test_<lang>_<backend>.mp3.")
add_tts_override_args(test_tts, tts_backends)
voices = sub.add_parser("tts-voices", help="List voices or voice-listing hints for a TTS backend.")
voices.add_argument("lang", nargs="?", choices=choices)
voices.add_argument("--backend", choices=tts_backends, help="Backend to list instead of the language default.")
return parser
def main(argv: list[str] | None = None) -> int:
"""Run the CLI and return a process exit status."""
# Parse --config first so subcommand language choices can come from the
# user's config file instead of only the built-in defaults.
pre = argparse.ArgumentParser(add_help=False)
add_config_arg(pre)
known, _ = pre.parse_known_args(argv)
@@ -114,10 +178,27 @@ def main(argv: list[str] | None = None) -> int:
return 0
if args.command == "import":
result = import_sentences(config, args.lang, args.sentence_file, args.tags)
tts_overrides = collect_tts_overrides(args)
result = import_sentences(config, args.lang, args.sentence_file, args.tags, tts_overrides=tts_overrides)
print(f"Done. Added {result.added}/{result.processed} cards. Failed: {result.failed}")
for error in result.errors:
print(f"Error: {error}", file=sys.stderr)
return 0 if result.failed == 0 else 1
if args.command == "tts-test":
try:
output = synthesize_tts_sample(config, args.lang, args.text, args.out, collect_tts_overrides(args))
print(f"Wrote TTS sample: {output}")
return 0
except Exception as exc:
print(f"Error: {format_tts_error(exc)}", file=sys.stderr)
return 1
if args.command == "tts-voices":
for line in list_tts_voices(config, args.lang, args.backend):
print(line)
return 0
parser.print_help()
return 2

View File

@@ -23,14 +23,15 @@ DEFAULT_CONFIG: dict[str, Any] = {
"audio_output_root": "~/Languages/Anki/anki-audio",
"word_output_root": "~/Languages/Anki/anki-words",
"sentence_dir": "~/Languages/Anki",
"tts_model_dir": "~/.local/share/saiki/models",
"note_model": "Basic",
"fields": {"front": "Front", "back": "Back"},
"languages": {
"jp": {
"name": "japanese",
"transcript_code": "ja",
"tts_code": "ja",
"tts_tld": "com",
"tts_backend": "edge-tts",
"tts_voice": "ja-JP-NanamiNeural",
"tts_tempo": 1.35,
"decks": ["日本語"],
"word_model": "ja_core_news_lg",
@@ -40,8 +41,8 @@ DEFAULT_CONFIG: dict[str, Any] = {
"es": {
"name": "spanish",
"transcript_code": "es",
"tts_code": "es",
"tts_tld": "es",
"tts_backend": "edge-tts",
"tts_voice": "es-ES-ElviraNeural",
"tts_tempo": 1.25,
"decks": ["Español"],
"word_model": "es_core_news_sm",
@@ -54,73 +55,108 @@ DEFAULT_CONFIG: dict[str, Any] = {
@dataclass(frozen=True)
class Config:
"""Typed convenience wrapper around the merged YAML configuration.
The underlying ``data`` mapping remains available for simple serialization
and tests, while properties and helpers provide normalized paths and common
language-specific lookups for the rest of the application.
"""
data: dict[str, Any]
@property
def anki_connect_url(self) -> str:
"""URL for the local AnkiConnect HTTP server."""
return str(self.data["anki_connect_url"])
@property
def media_dir(self) -> str:
"""Expanded path to Anki's collection.media directory."""
return expand_path(str(self.data["media_dir"]))
@property
def audio_output_root(self) -> str:
"""Expanded root directory for exported listening audio."""
return expand_path(str(self.data["audio_output_root"]))
@property
def word_output_root(self) -> str:
"""Expanded root directory for generated vocabulary lists."""
return expand_path(str(self.data["word_output_root"]))
@property
def sentence_dir(self) -> str:
"""Expanded directory used for relative sentence import files."""
return expand_path(str(self.data["sentence_dir"]))
@property
def tts_model_dir(self) -> str:
"""Expanded directory used to resolve local TTS model paths."""
return expand_path(str(self.data["tts_model_dir"]))
@property
def note_model(self) -> str:
"""Anki note type used when importing generated sentence cards."""
return str(self.data.get("note_model", "Basic"))
@property
def fields(self) -> dict[str, str]:
"""Configured logical field names, currently front and back."""
return dict(self.data.get("fields", {}))
@property
def languages(self) -> dict[str, dict[str, Any]]:
"""Language configurations keyed by CLI language code."""
return dict(self.data.get("languages", {}))
def language(self, lang: str) -> dict[str, Any]:
"""Return one language config with shared TTS defaults applied.
A fresh dict is returned so callers may layer CLI overrides onto it
without mutating the loaded configuration.
"""
try:
return dict(self.languages[lang])
language = dict(self.languages[lang])
language.setdefault("tts_model_dir", self.tts_model_dir)
return language
except KeyError as e:
available = ", ".join(sorted(self.languages))
raise ValueError(f"Unsupported language '{lang}'. Available: {available}") from e
def language_name(self, lang: str) -> str:
"""Return the long language bucket name for output directories."""
return str(self.language(lang)["name"])
def transcript_code(self, lang: str) -> str:
"""Return the language code expected by transcript providers."""
return str(self.language(lang)["transcript_code"])
def decks_for(self, lang: str) -> list[str]:
"""Return configured Anki deck names for a language."""
return list(self.language(lang).get("decks", []))
def field_for(self, lang: str) -> str:
"""Return the Anki field to mine for vocabulary."""
return str(self.language(lang).get("field", self.fields.get("back", "Back")))
def sentence_file_for(self, lang: str) -> str:
"""Resolve the sentence import file for a language."""
value = str(self.language(lang).get("sentence_file", f"sentences_{lang}.txt"))
return expand_path(value if os.path.isabs(value) or value.startswith("~") else os.path.join(self.sentence_dir, value))
def expand_path(path: str) -> str:
"""Expand ``~`` and environment variables in a configured path."""
return os.path.expanduser(os.path.expandvars(path))
def default_config_path() -> str:
"""Return the conventional user config path."""
return expand_path("~/.config/saiki/config.yaml")
def deep_merge(base: dict[str, Any], override: dict[str, Any]) -> dict[str, Any]:
"""Recursively merge a user config mapping over default config values."""
result = copy.deepcopy(base)
for key, value in override.items():
if isinstance(value, dict) and isinstance(result.get(key), dict):
@@ -131,6 +167,7 @@ def deep_merge(base: dict[str, Any], override: dict[str, Any]) -> dict[str, Any]
def load_config(path: str | None = None) -> Config:
"""Load defaults plus an optional YAML config file."""
config = copy.deepcopy(DEFAULT_CONFIG)
config_path = expand_path(path) if path else default_config_path()
if os.path.exists(config_path):
@@ -145,4 +182,5 @@ def load_config(path: str | None = None) -> Config:
def language_choices(config: Config) -> list[str]:
"""Return sorted language codes suitable for argparse choices."""
return sorted(config.languages.keys())

View File

@@ -1,28 +1,57 @@
"""Generate TTS audio and add sentence notes to Anki."""
"""Generate TTS audio and add sentence notes to Anki.
This module owns the TTS backend abstraction used by both ``import`` and
``tts-test``. Backends synthesize their native output format first, then ffmpeg
normalizes the result to MP3 and applies the configured tempo multiplier.
"""
from __future__ import annotations
import os
import csv
import os
import shutil
import subprocess
import tempfile
import time
from dataclasses import dataclass
from typing import Callable
from dataclasses import dataclass, field
from typing import Any, Callable, Mapping
from .ankiconnect import anki_request
from .config import Config
from .config import Config, expand_path
@dataclass(frozen=True)
class ImportResult:
"""Summary of one sentence import run."""
processed: int
added: int
failed: int
errors: list[str] = field(default_factory=list)
@dataclass(frozen=True)
class PreparedTtsBackend:
"""Runtime-ready TTS backend callable plus its native audio extension."""
name: str
raw_ext: str
synthesize: Callable[[str, str], None]
@dataclass(frozen=True)
class TtsBackendSpec:
"""Static metadata needed to validate and build a TTS backend."""
raw_ext: str
build: Callable[[dict[str, Any]], Callable[[str, str], None]]
required_keys: tuple[str, ...] = ()
command: str | None = None
list_voices: Callable[[dict[str, Any]], list[str]] | None = None
def parse_tags(value: str | None) -> list[str]:
"""Parse comma-separated tag text and add Saiki's default TTS tags."""
tags = ["text-to-speech"]
if value:
tags.extend(tag.strip() for tag in value.split(",") if tag.strip())
@@ -32,15 +61,407 @@ def parse_tags(value: str | None) -> list[str]:
def require_command(name: str) -> None:
"""Raise a friendly error if an external command is not on PATH."""
if shutil.which(name) is None:
raise RuntimeError(f"Required command not found: {name}")
def generate_tts(sentence: str, raw_output: str, lang_code: str, tld: str) -> None:
subprocess.run(["gtts-cli", sentence, "--lang", lang_code, "--tld", tld, "--output", raw_output], check=True)
_TTS_PATH_KEYS = ("tts_model", "tts_voices", "tts_vocab_config", "tts_config")
_MAX_ERROR_DETAILS = 5
_DEFAULT_TEST_TEXT = {
"jp": "これはテストです。",
"es": "Esta es una prueba.",
}
def _generate_gtts(cfg: dict[str, Any]) -> Callable[[str, str], None]:
"""Build a gTTS synthesizer using the command-line wrapper."""
lang_code = str(cfg["tts_code"])
tld = str(cfg["tts_tld"])
def synthesize(sentence: str, output: str) -> None:
subprocess.run(
["gtts-cli", sentence, "--lang", lang_code, "--tld", tld, "--output", output],
stdin=subprocess.DEVNULL,
check=True,
)
return synthesize
def _generate_edge_tts(cfg: dict[str, Any]) -> Callable[[str, str], None]:
"""Build an edge-tts synthesizer for a configured neural voice."""
voice = str(cfg["tts_voice"])
def synthesize(sentence: str, output: str) -> None:
subprocess.run(
["edge-tts", "--voice", voice, "--text", sentence, "--write-media", output],
stdin=subprocess.DEVNULL,
check=True,
)
return synthesize
def _generate_piper(cfg: dict[str, Any]) -> Callable[[str, str], None]:
"""Build a Piper synthesizer around a local ONNX voice model."""
model = str(cfg["tts_model"])
_require_file("piper", "tts_model", model)
config = str(cfg["tts_config"]) if cfg.get("tts_config") else None
if config:
_require_file("piper", "tts_config", config)
def synthesize(sentence: str, output: str) -> None:
command = ["piper", "--model", model]
if config:
command.extend(["--config", config])
command.extend(["--output_file", output])
subprocess.run(
command,
input=f"{sentence}\n".encode("utf-8"),
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
check=True,
)
return synthesize
def _generate_espeak(cfg: dict[str, Any]) -> Callable[[str, str], None]:
"""Build an espeak-ng synthesizer for a configured voice code."""
voice = str(cfg["tts_voice"])
def synthesize(sentence: str, output: str) -> None:
subprocess.run(["espeak-ng", "-v", voice, "-w", output, sentence], stdin=subprocess.DEVNULL, check=True)
return synthesize
def _generate_kokoro(cfg: dict[str, Any]) -> Callable[[str, str], None]:
"""Build a Kokoro ONNX synthesizer from local model and voice files."""
_require_file("kokoro", "tts_model", str(cfg["tts_model"]))
_require_file("kokoro", "tts_voices", str(cfg["tts_voices"]))
if cfg.get("tts_vocab_config"):
_require_file("kokoro", "tts_vocab_config", str(cfg["tts_vocab_config"]))
try:
from kokoro_onnx import Kokoro # type: ignore
import soundfile as sf # type: ignore
except ImportError as exc:
raise RuntimeError(
"kokoro backend requires 'kokoro-onnx' and 'soundfile'. Install them first."
) from exc
kokoro_kwargs = {}
if cfg.get("tts_vocab_config"):
kokoro_kwargs["vocab_config"] = str(cfg["tts_vocab_config"])
kokoro = Kokoro(str(cfg["tts_model"]), str(cfg["tts_voices"]), **kokoro_kwargs)
voice = str(cfg["tts_voice"])
lang_code = str(cfg["tts_code"])
speed = _optional_float(cfg, "tts_speed", 1.0)
def synthesize(sentence: str, output: str) -> None:
samples, sample_rate = kokoro.create(sentence, voice=voice, speed=speed, lang=lang_code)
sf.write(output, samples, sample_rate)
return synthesize
def _list_gtts_voices(cfg: dict[str, Any]) -> list[str]:
"""Return voice-listing guidance for gTTS."""
return [
"gtts does not expose named voices.",
f"Current settings: tts_code={cfg.get('tts_code', '<unset>')}, tts_tld={cfg.get('tts_tld', '<unset>')}",
]
def _list_edge_voices(cfg: dict[str, Any]) -> list[str]:
"""Return the configured edge-tts voice or ask the CLI to list online voices."""
if cfg.get("tts_voice"):
return [
f"Configured edge-tts voice: {cfg['tts_voice']}",
"Run `edge-tts --list-voices` directly to browse the full online voice catalog.",
]
return _run_voice_command(["edge-tts", "--list-voices"])
def _list_espeak_voices(cfg: dict[str, Any]) -> list[str]:
"""List espeak-ng voices, narrowed by the configured language when possible."""
voice_filter = str(cfg.get("tts_voice") or cfg.get("tts_code") or "")
arg = f"--voices={voice_filter}" if voice_filter else "--voices"
return _run_voice_command(["espeak-ng", arg])
def _list_piper_voices(cfg: dict[str, Any]) -> list[str]:
"""Return Piper model guidance instead of pretending it has a voice catalog."""
model = cfg.get("tts_model")
if model:
return [f"Configured Piper model: {model}"]
return ["Piper voices are model files. Set tts_model to a downloaded .onnx voice model."]
def _list_kokoro_voices(cfg: dict[str, Any]) -> list[str]:
"""Return Kokoro voice-bundle guidance from the configured files."""
voice = cfg.get("tts_voice")
voices = cfg.get("tts_voices")
if voice or voices:
return [f"Configured Kokoro voice: {voice or '<unset>'}", f"Voice bundle: {voices or '<unset>'}"]
return ["Kokoro voices come from the configured tts_voices bundle. Set tts_voice to one voice from it."]
# Registry entries describe validation, dependency checks, synthesis, and
# optional voice-listing behavior in one place so new free backends can be added
# without changing the CLI or import workflows.
_TTS_BACKENDS: dict[str, TtsBackendSpec] = {
"gtts": TtsBackendSpec(
raw_ext=".mp3",
command="gtts-cli",
required_keys=("tts_code", "tts_tld"),
build=_generate_gtts,
list_voices=_list_gtts_voices,
),
"edge-tts": TtsBackendSpec(
raw_ext=".mp3",
command="edge-tts",
required_keys=("tts_voice",),
build=_generate_edge_tts,
list_voices=_list_edge_voices,
),
"piper": TtsBackendSpec(
raw_ext=".wav",
command="piper",
required_keys=("tts_model",),
build=_generate_piper,
list_voices=_list_piper_voices,
),
"espeak-ng": TtsBackendSpec(
raw_ext=".wav",
command="espeak-ng",
required_keys=("tts_voice",),
build=_generate_espeak,
list_voices=_list_espeak_voices,
),
"kokoro": TtsBackendSpec(
raw_ext=".wav",
required_keys=("tts_model", "tts_voices", "tts_voice", "tts_code"),
build=_generate_kokoro,
list_voices=_list_kokoro_voices,
),
}
def supported_tts_backends() -> list[str]:
"""Return supported backend names for argparse choices and error messages."""
return sorted(_TTS_BACKENDS)
def prepare_tts_backend(lang_cfg: dict[str, Any]) -> PreparedTtsBackend:
"""Validate config and return a callable backend for one language.
Path-like config values are expanded before validation, required keys are
checked per backend, and external command dependencies are verified when a
backend shells out to a CLI tool.
"""
backend = str(lang_cfg.get("tts_backend", "gtts")).strip()
spec = _TTS_BACKENDS.get(backend)
if spec is None:
raise ValueError(_unknown_backend_message(backend))
cfg = _expand_tts_paths(lang_cfg)
_require_backend_keys(backend, cfg, spec.required_keys)
if spec.command:
require_command(spec.command)
return PreparedTtsBackend(name=backend, raw_ext=spec.raw_ext, synthesize=spec.build(cfg))
def list_tts_voices(config: Config, lang: str | None = None, backend: str | None = None) -> list[str]:
"""Return voice names or backend-specific hints for the selected TTS backend."""
lang_cfg = config.language(lang) if lang else {}
if backend:
lang_cfg = {**lang_cfg, "tts_backend": backend}
name = str(lang_cfg.get("tts_backend", "gtts")).strip()
spec = _TTS_BACKENDS.get(name)
if spec is None:
raise ValueError(_unknown_backend_message(name))
cfg = _expand_tts_paths(lang_cfg)
if spec.list_voices is None:
return [f"{name} does not support voice listing."]
return spec.list_voices(cfg)
def default_tts_test_text(lang: str) -> str:
"""Return a short built-in phrase for ``tts-test``."""
return _DEFAULT_TEST_TEXT.get(lang, "This is a test.")
def synthesize_tts_sample(
config: Config,
lang: str,
text: str | None = None,
output: str | None = None,
tts_overrides: Mapping[str, Any] | None = None,
) -> str:
"""Generate one TTS sample without touching Anki.
This is the safest way to verify backend configuration. It uses the same
backend preparation and ffmpeg normalization path as real imports.
"""
language = _language_config(config, lang, tts_overrides)
backend = prepare_tts_backend(language)
tempo = _tts_tempo(language)
require_command("ffmpeg")
output_path = expand_path(output) if output else _default_tts_output(lang, backend.name)
output_dir = os.path.dirname(output_path)
if output_dir:
os.makedirs(output_dir, exist_ok=True)
sentence = text or default_tts_test_text(lang)
with tempfile.TemporaryDirectory() as temp_dir:
raw_output = os.path.join(temp_dir, f"tts_test_original{backend.raw_ext}")
backend.synthesize(sentence, raw_output)
speed_audio(raw_output, output_path, tempo)
return output_path
def _raw_ext(backend: str) -> str:
"""Return a backend's raw extension, defaulting to MP3 for unknown names."""
spec = _TTS_BACKENDS.get(backend)
return spec.raw_ext if spec else ".mp3"
def _language_config(
config: Config,
lang: str,
tts_overrides: Mapping[str, Any] | None = None,
) -> dict[str, Any]:
"""Return a language config with any non-None CLI overrides applied."""
language = config.language(lang)
if tts_overrides:
language.update({key: value for key, value in tts_overrides.items() if value is not None})
return language
def _default_tts_output(lang: str, backend: str) -> str:
"""Return the default sample output path for ``tts-test``."""
safe_backend = backend.replace(os.sep, "_")
return os.path.abspath(f"tts_test_{lang}_{safe_backend}.mp3")
def _expand_tts_paths(lang_cfg: Mapping[str, Any]) -> dict[str, Any]:
"""Expand TTS paths and resolve relative model files under tts_model_dir."""
cfg = dict(lang_cfg)
if isinstance(cfg.get("tts_model_dir"), str):
cfg["tts_model_dir"] = expand_path(str(cfg["tts_model_dir"]))
for key in _TTS_PATH_KEYS:
if isinstance(cfg.get(key), str):
path = expand_path(str(cfg[key]))
if not os.path.isabs(path) and cfg.get("tts_model_dir"):
path = os.path.join(str(cfg["tts_model_dir"]), path)
cfg[key] = path
return cfg
def _require_backend_keys(backend: str, cfg: Mapping[str, Any], keys: tuple[str, ...]) -> None:
"""Ensure backend-specific required config keys are present and non-empty."""
missing = [key for key in keys if cfg.get(key) is None or (isinstance(cfg.get(key), str) and not cfg[key].strip())]
if missing:
raise RuntimeError(f"{backend} backend requires config key(s): {', '.join(missing)}")
def _require_file(backend: str, key: str, path: str) -> None:
"""Ensure a configured model path exists before calling a backend."""
if not os.path.isfile(path):
raise RuntimeError(f"{backend} backend config key {key} points to a missing file: {path}")
def format_tts_error(exc: Exception) -> str:
"""Format backend and ffmpeg failures for concise CLI output."""
return _error_message(exc)
def _unknown_backend_message(backend: str) -> str:
"""Build a consistent unknown-backend error message."""
return f"Unknown TTS backend: {backend!r}. Choose from: {', '.join(supported_tts_backends())}"
def _optional_float(cfg: Mapping[str, Any], key: str, default: float | None) -> float | None:
"""Parse an optional numeric config value."""
if cfg.get(key) is None:
return default
try:
return float(cfg[key])
except (TypeError, ValueError) as exc:
raise RuntimeError(f"{key} must be a number.") from exc
def _run_voice_command(command: list[str], timeout: float = 30.0) -> list[str]:
"""Run an external voice-listing command and return printable lines."""
if shutil.which(command[0]) is None:
return [f"Required command not found: {command[0]}"]
try:
result = subprocess.run(
command,
text=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
timeout=timeout,
check=True,
)
except subprocess.CalledProcessError as exc:
return [f"{' '.join(command)} failed: {_subprocess_detail(exc)}"]
except subprocess.TimeoutExpired:
return [f"{' '.join(command)} timed out after {timeout:g}s."]
lines = [line for line in result.stdout.splitlines() if line.strip()]
return lines or ["No voices returned."]
def _short_text(value: str, limit: int = 500) -> str:
"""Collapse and truncate long subprocess output for display."""
text = " ".join(value.strip().split())
if len(text) <= limit:
return text
return f"{text[:limit - 3]}..."
def _sentence_label(sentence: str, limit: int = 80) -> str:
"""Return a compact representation of a sentence for error lists."""
text = " ".join(sentence.split())
if len(text) > limit:
text = f"{text[:limit - 3]}..."
return repr(text)
def _subprocess_detail(exc: subprocess.CalledProcessError) -> str:
"""Extract useful stdout/stderr context from a failed subprocess."""
stderr = exc.stderr
stdout = exc.stdout
detail = stderr if stderr else stdout
if isinstance(detail, bytes):
detail = detail.decode("utf-8", errors="replace")
text = _short_text(str(detail or ""))
command = exc.cmd if isinstance(exc.cmd, str) else " ".join(str(part) for part in exc.cmd)
suffix = f": {text}" if text else ""
return f"{command} exited with status {exc.returncode}{suffix}"
def _error_message(exc: Exception) -> str:
"""Convert an exception into a short user-facing string."""
if isinstance(exc, subprocess.CalledProcessError):
return _subprocess_detail(exc)
return _short_text(str(exc) or exc.__class__.__name__)
def _tts_tempo(cfg: Mapping[str, Any]) -> float:
"""Validate and return the post-processing tempo multiplier."""
tempo = _optional_float(cfg, "tts_tempo", 1.0)
if tempo is None or tempo <= 0:
raise RuntimeError("tts_tempo must be greater than 0.")
return tempo
def speed_audio(raw_output: str, output_path: str, tempo: float) -> None:
"""Convert backend output to an MP3 and apply ffmpeg's atempo filter."""
subprocess.run(
["ffmpeg", "-loglevel", "error", "-i", raw_output, "-filter:a", f"atempo={tempo}", "-y", output_path],
stdin=subprocess.DEVNULL,
@@ -49,6 +470,11 @@ def speed_audio(raw_output: str, output_path: str, tempo: float) -> None:
def read_sentences(path: str) -> list[str]:
"""Read sentences from plain text, CSV, or TSV input.
CSV and TSV imports must contain a ``sentence`` header so exports from the
YouTube sentence-mining command can be imported directly.
"""
expanded = os.path.expanduser(path)
if expanded.lower().endswith((".tsv", ".csv")):
delimiter = "\t" if expanded.lower().endswith(".tsv") else ","
@@ -68,11 +494,19 @@ def import_sentences(
sentence_file: str | None = None,
tags_value: str | None = None,
request: Callable = anki_request,
tts_overrides: Mapping[str, Any] | None = None,
) -> ImportResult:
require_command("gtts-cli")
"""Generate TTS for each sentence and add cards through AnkiConnect.
The first configured deck for the language is used as the destination.
Audio is attached to the front field so Anki imports the temporary MP3 into
its media collection before the temporary directory is removed.
"""
language = _language_config(config, lang, tts_overrides)
backend = prepare_tts_backend(language)
tempo = _tts_tempo(language)
require_command("ffmpeg")
language = config.language(lang)
decks = list(language.get("decks", []))
if not decks:
raise RuntimeError(f"No deck configured for language: {lang}")
@@ -85,15 +519,16 @@ def import_sentences(
back_field = config.fields.get("back", "Back")
added = 0
failed = 0
errors: list[str] = []
with tempfile.TemporaryDirectory() as temp_dir:
for sentence in sentences:
basename = f"tts_{time.strftime('%Y%m%d_%H%M%S')}_{lang}_{os.getpid()}_{added + failed}"
raw_output = os.path.join(temp_dir, f"{basename}_original.mp3")
raw_output = os.path.join(temp_dir, f"{basename}_original{backend.raw_ext}")
output_path = os.path.join(temp_dir, f"{basename}.mp3")
try:
generate_tts(sentence, raw_output, str(language["tts_code"]), str(language["tts_tld"]))
speed_audio(raw_output, output_path, float(language["tts_tempo"]))
backend.synthesize(sentence, raw_output)
speed_audio(raw_output, output_path, tempo)
request(
"addNote",
url=config.anki_connect_url,
@@ -107,6 +542,8 @@ def import_sentences(
},
)
added += 1
except Exception:
except Exception as exc:
failed += 1
return ImportResult(processed=len(sentences), added=added, failed=failed)
if len(errors) < _MAX_ERROR_DETAILS:
errors.append(f"{_sentence_label(sentence)}: {_error_message(exc)}")
return ImportResult(processed=len(sentences), added=added, failed=failed, errors=errors)

View File

@@ -8,6 +8,11 @@ import regex as re
def extract_first_visible_line(text: str) -> str:
"""Strip simple HTML and return the first visible line of text.
This is useful for card fields where the first line is the sentence and
subsequent lines may contain notes, hints, or generated markup.
"""
text = unescape(text or "")
text = re.sub(r"</?(br|div|p)[^>]*>", "\n", text, flags=re.IGNORECASE)
text = re.sub(r"<[^>]+>", "", text)
@@ -16,6 +21,7 @@ def extract_first_visible_line(text: str) -> str:
def extract_visible_text(text: str) -> str:
"""Strip simple HTML while preserving paragraph-like line breaks."""
text = unescape(text or "")
text = re.sub(r"</?(br|div|p)[^>]*>", "\n", text, flags=re.IGNORECASE)
text = re.sub(r"<[^>]+>", "", text)
@@ -25,5 +31,5 @@ def extract_visible_text(text: str) -> str:
def normalize_word_key(value: str) -> str:
"""Normalize a word for set membership and comparison."""
return re.sub(r"\s+", " ", value.strip().lower())

View File

@@ -26,15 +26,22 @@ JAPANESE_ALLOWED_POS = {"NOUN", "PROPN", "VERB", "ADJ"}
def setup_logging(logfile: str) -> None:
"""Configure file logging for word extraction scripts."""
os.makedirs(os.path.dirname(os.path.abspath(logfile)), exist_ok=True)
logging.basicConfig(filename=logfile, level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s")
def build_query_from_decks(decks: list[str]) -> str:
"""Build an Anki search query that matches any configured deck."""
return " OR ".join(f'deck:"{d}"' for d in decks)
def japanese_filter(token) -> bool:
"""Return whether a spaCy token is useful Japanese vocabulary.
The filter is intentionally conservative: it keeps content words and drops
common particles, helper grammar, stop words, URLs, and obvious HTML debris.
"""
text = (token.text or "").strip()
lemma = (token.lemma_ or "").strip()
if not text or not JAPANESE_CHAR_RE.fullmatch(text):
@@ -51,14 +58,17 @@ def japanese_filter(token) -> bool:
def spanish_filter(token) -> bool:
"""Return whether a spaCy token is useful Spanish vocabulary."""
return bool(getattr(token, "is_alpha", False)) and not bool(getattr(token, "is_stop", False))
def spanish_format(token) -> str:
"""Normalize a Spanish token to its lowercase lemma."""
return (token.lemma_ or token.text or "").lower().strip()
def japanese_format(token) -> str:
"""Format a Japanese token as lemma plus surface form when they differ."""
lemma = (token.lemma_ or "").strip()
surface = (token.text or "").strip()
if lemma and surface and lemma != surface:
@@ -73,6 +83,7 @@ LANGUAGE_PROFILES = {
def load_spacy_model(model_name: str):
"""Load a spaCy model with installation-oriented error messages."""
try:
import spacy # type: ignore
except Exception as e:
@@ -84,6 +95,7 @@ def load_spacy_model(model_name: str):
def get_notes(query: str, config: Config, request: Callable = anki_request) -> list[dict]:
"""Fetch Anki note details matching a search query."""
note_ids = request("findNotes", url=config.anki_connect_url, query=query) or []
if not note_ids:
return []
@@ -98,6 +110,7 @@ def extract_counts(
output_format: Callable,
use_full_field: bool,
) -> Counter:
"""Count formatted vocabulary items across Anki notes."""
counter: Counter = Counter()
for note in notes:
fields = note.get("fields", {}) or {}
@@ -114,6 +127,7 @@ def extract_counts(
def write_counts(counter: Counter, out_path: str, min_freq: int) -> int:
"""Write a sorted ``word frequency`` list and return the number of rows."""
items = [(w, c) for (w, c) in counter.items() if c >= min_freq]
items.sort(key=lambda x: (-x[1], x[0]))
os.makedirs(os.path.dirname(os.path.abspath(out_path)), exist_ok=True)
@@ -124,6 +138,7 @@ def write_counts(counter: Counter, out_path: str, min_freq: int) -> int:
def read_word_file(path: str) -> set[str]:
"""Read a ``word frequency`` file into normalized word keys."""
words: set[str] = set()
with open(os.path.expanduser(path), "r", encoding="utf-8") as fh:
for line in fh:
@@ -136,6 +151,7 @@ def read_word_file(path: str) -> set[str]:
def compare_word_files(source_path: str, known_path: str) -> list[str]:
"""Return source rows whose normalized word is not in the known list."""
known = read_word_file(known_path)
new_words: list[str] = []
with open(os.path.expanduser(source_path), "r", encoding="utf-8") as fh:
@@ -162,6 +178,12 @@ def extract_words(
spacy_model: str | None = None,
request: Callable = anki_request,
) -> dict[str, object]:
"""Extract frequent vocabulary from configured Anki cards.
The function accepts explicit query/deck/field overrides for CLI use, but
defaults to the selected language config. Its dictionary return value keeps
the CLI output simple and gives tests stable fields to assert against.
"""
language_bucket = config.language_name(lang)
profile = LANGUAGE_PROFILES[language_bucket]
search_query = query or build_query_from_decks(decks or config.decks_for(lang))
@@ -180,4 +202,3 @@ def extract_words(
counter = extract_counts(notes, field_name, nlp, profile["token_filter"], profile["output_format"], full_field)
written = write_counts(counter, out_path, min_freq)
return {"query": search_query, "notes": len(notes), "unique": len(counter), "written": written, "out": out_path}

View File

@@ -27,11 +27,14 @@ STOPWORDS = {
@dataclass(frozen=True)
class TranscriptLine:
"""One cleaned transcript line with its start timestamp in seconds."""
start: float
text: str
def extract_video_id(url_or_id: str) -> str:
"""Extract a YouTube video id from a URL, or pass an id through unchanged."""
if "youtube" in url_or_id or "youtu.be" in url_or_id:
query = urlparse(url_or_id)
if query.hostname == "youtu.be":
@@ -44,11 +47,13 @@ def extract_video_id(url_or_id: str) -> str:
def video_url(video_or_id: str) -> str:
"""Return a canonical watch URL for a YouTube id or URL."""
video_id = extract_video_id(video_or_id)
return f"https://www.youtube.com/watch?v={video_id}"
def fetch_transcript(video_id: str, lang_code: str):
"""Fetch a transcript while supporting old and new library APIs."""
if hasattr(YouTubeTranscriptApi, "fetch"):
api = YouTubeTranscriptApi()
return api.fetch(video_id, languages=[lang_code])
@@ -58,18 +63,21 @@ def fetch_transcript(video_id: str, lang_code: str):
def snippet_text(entry) -> str:
"""Read transcript text from either dict-like or object-like entries."""
if isinstance(entry, dict):
return entry.get("text", "") or ""
return getattr(entry, "text", "") or ""
def snippet_start(entry) -> float:
"""Read transcript start time from either dict-like or object-like entries."""
if isinstance(entry, dict):
return float(entry.get("start", 0.0) or 0.0)
return float(getattr(entry, "start", 0.0) or 0.0)
def transcript_lines(entries) -> list[TranscriptLine]:
"""Normalize raw transcript entries into non-empty transcript lines."""
lines: list[TranscriptLine] = []
for entry in entries:
text = snippet_text(entry).replace("\n", " ").strip()
@@ -79,6 +87,7 @@ def transcript_lines(entries) -> list[TranscriptLine]:
def tokenize_japanese(text: str) -> list[str]:
"""Tokenize Japanese text with fugashi."""
try:
from fugashi import Tagger
except ImportError as e:
@@ -88,15 +97,18 @@ def tokenize_japanese(text: str) -> list[str]:
def tokenize_spanish(text: str, raw: bool = False) -> list[str]:
"""Tokenize Spanish-ish text with a lightweight word regex."""
tokens = re.findall(r"\b[\wáéíóúñü]+\b", text)
return tokens if raw else [t.lower() for t in tokens]
def tokenize_text(text: str, lang_code: str, raw: bool = False) -> list[str]:
"""Dispatch transcript tokenization by language code."""
return tokenize_japanese(text) if lang_code == "ja" else tokenize_spanish(text, raw=raw)
def count_words(tokens: list[str], lang_code: str, remove_stopwords: bool = True) -> Counter:
"""Count tokens, optionally excluding the built-in stopword list."""
if remove_stopwords:
stopwords = STOPWORDS.get(lang_code, set())
tokens = [t for t in tokens if t not in stopwords]
@@ -104,6 +116,7 @@ def count_words(tokens: list[str], lang_code: str, remove_stopwords: bool = True
def sentence_vocab(sentence: str, lang_code: str, known_words: set[str] | None = None) -> list[str]:
"""Guess distinct useful vocabulary for one transcript sentence."""
words: list[str] = []
seen: set[str] = set()
for token in tokenize_text(sentence, lang_code):
@@ -126,6 +139,7 @@ def write_sentence_export(
known_words_path: str | None = None,
only_new: bool = False,
) -> int:
"""Write transcript lines as Anki-importable sentence rows."""
known = read_word_file(known_words_path) if known_words_path else None
os.makedirs(os.path.dirname(os.path.abspath(out_path)), exist_ok=True)
written = 0
@@ -154,6 +168,7 @@ def run_youtube(
known_words: str | None = None,
only_new: bool = False,
) -> dict[str, object]:
"""Run transcript mining in either vocabulary or sentence-export mode."""
lang_code = config.transcript_code(lang)
video_id = extract_video_id(video)
entries = fetch_transcript(video_id, lang_code)
@@ -176,4 +191,3 @@ def run_youtube(
for word, count in items:
fh.write(f"{word} {count}\n")
return {"mode": mode, "items": items, "out": out}

View File

@@ -4,15 +4,26 @@ import os
import sys
import tempfile
import unittest
from copy import deepcopy
from pathlib import Path
from unittest.mock import patch
SRC_DIR = Path(__file__).resolve().parents[1] / "src"
if str(SRC_DIR) not in sys.path:
sys.path.insert(0, str(SRC_DIR))
from saiki.audio import build_playlist, resolve_media_paths
from saiki.config import DEFAULT_CONFIG, deep_merge
from saiki.importer import parse_tags, read_sentences
from saiki.config import Config, DEFAULT_CONFIG, deep_merge
from saiki.importer import (
PreparedTtsBackend,
import_sentences,
list_tts_voices,
parse_tags,
prepare_tts_backend,
read_sentences,
synthesize_tts_sample,
supported_tts_backends,
)
from saiki.text import extract_first_visible_line, extract_visible_text
from saiki.words import build_query_from_decks, compare_word_files, read_word_file
from saiki.youtube import TranscriptLine, extract_video_id, sentence_vocab, write_sentence_export
@@ -23,6 +34,7 @@ class ConfigTests(unittest.TestCase):
merged = deep_merge(DEFAULT_CONFIG, {"languages": {"es": {"decks": ["Spanish"]}}})
self.assertEqual(merged["languages"]["es"]["decks"], ["Spanish"])
self.assertEqual(merged["languages"]["es"]["transcript_code"], "es")
self.assertEqual(merged["tts_model_dir"], "~/.local/share/saiki/models")
self.assertIn("jp", merged["languages"])
@@ -107,6 +119,87 @@ class ImporterTests(unittest.TestCase):
fh.write("Hola mundo\t1.00\thttps://example.test\tmundo\n")
self.assertEqual(read_sentences(path), ["Hola mundo"])
def test_supported_tts_backends_are_free_options(self):
self.assertEqual(supported_tts_backends(), ["edge-tts", "espeak-ng", "gtts", "kokoro", "piper"])
def test_list_gtts_voice_hint(self):
config = Config(deep_merge(deepcopy(DEFAULT_CONFIG), {"languages": {"es": {"tts_code": "es", "tts_tld": "es"}}}))
voices = list_tts_voices(config, "es", backend="gtts")
self.assertIn("gtts does not expose named voices.", voices)
self.assertIn("tts_code=es", voices[1])
def test_list_edge_voice_uses_configured_voice(self):
voices = list_tts_voices(Config(deepcopy(DEFAULT_CONFIG)), "es")
self.assertEqual(voices[0], "Configured edge-tts voice: es-ES-ElviraNeural")
def test_prepare_tts_backend_validates_required_keys(self):
with self.assertRaisesRegex(RuntimeError, "tts_voice"):
prepare_tts_backend({"tts_backend": "edge-tts"})
def test_prepare_tts_backend_expands_model_paths(self):
with tempfile.TemporaryDirectory() as tmp:
model = os.path.join(tmp, "voice.onnx")
config = os.path.join(tmp, "voice.onnx.json")
for path in [model, config]:
with open(path, "w", encoding="utf-8") as fh:
fh.write("{}")
with patch("saiki.importer.require_command"):
backend = prepare_tts_backend(
{
"tts_backend": "piper",
"tts_model_dir": tmp,
"tts_model": "voice.onnx",
"tts_config": "voice.onnx.json",
}
)
with patch("saiki.importer.subprocess.run") as run:
backend.synthesize("Hola", "/tmp/out.wav")
args = run.call_args.args[0]
self.assertEqual(args[2], model)
self.assertEqual(args[4], config)
self.assertEqual(run.call_args.kwargs["input"], b"Hola\n")
def test_synthesize_tts_sample_uses_backend_and_speed_audio(self):
seen: dict[str, str] = {}
def synthesize(sentence: str, output: str) -> None:
seen["sentence"] = sentence
seen["raw_output"] = output
with tempfile.TemporaryDirectory() as tmp:
output = os.path.join(tmp, "sample.mp3")
with patch("saiki.importer.prepare_tts_backend") as prepare, patch(
"saiki.importer.require_command"
), patch("saiki.importer.speed_audio") as speed:
prepare.return_value = PreparedTtsBackend("fake", ".wav", synthesize)
result = synthesize_tts_sample(Config(deepcopy(DEFAULT_CONFIG)), "es", output=output)
self.assertEqual(result, output)
self.assertEqual(seen["sentence"], "Esta es una prueba.")
self.assertTrue(seen["raw_output"].endswith(".wav"))
speed.assert_called_once()
def test_import_sentences_returns_error_details(self):
def fail_synthesis(sentence: str, output: str) -> None:
raise RuntimeError("tts broke")
with tempfile.TemporaryDirectory() as tmp:
path = os.path.join(tmp, "sentences.txt")
with open(path, "w", encoding="utf-8") as fh:
fh.write("Hola mundo\n")
with patch("saiki.importer.prepare_tts_backend") as prepare, patch("saiki.importer.require_command"):
prepare.return_value = PreparedTtsBackend("fake", ".mp3", fail_synthesis)
result = import_sentences(Config(deepcopy(DEFAULT_CONFIG)), "es", path, request=lambda *a, **k: None)
self.assertEqual(result.processed, 1)
self.assertEqual(result.added, 0)
self.assertEqual(result.failed, 1)
self.assertEqual(result.errors, ["'Hola mundo': tts broke"])
if __name__ == "__main__":
unittest.main()