Files
Saiki/word_scraper.py
2026-05-25 07:07:13 -04:00

383 lines
12 KiB
Python
Executable File

#!/usr/bin/env python3
"""
word_scraper.py
Extract frequent words/lemmas from Anki notes via AnkiConnect.
Howto:
./word_scraper.py jp [--deck "日本語"] [--field Back] [--min-freq 2] [--outdir DIR] [--out FILE]
./word_scraper.py es [--deck "Español"] [--field Back] [--min-freq 2] [--outdir DIR] [--out FILE]
By default, this:
- chooses decks based on the lang code (jp/es) using shared deck mappings
- pulls notes from Anki via AnkiConnect (http://localhost:8765)
- reads a single field (default: Back)
- extracts the first visible line (HTML stripped) from that field
- tokenizes with spaCy and counts words
- writes "token count" lines sorted by descending count
Notes:
- spaCy currently may not work on Python 3.14 in your environment.
If spaCy import/load fails, create a Python 3.12 venv for this script.
"""
from __future__ import annotations
import argparse
import logging
import os
import sys
from collections import Counter
from html import unescape
from typing import Callable, List
import regex as re
from anki_common import DEFAULT_WORD_OUTPUT_ROOT, DECK_TO_LANGUAGE, LANG_MAP, anki_request
# -------------------------
# Logging
# -------------------------
def setup_logging(logfile: str) -> None:
os.makedirs(os.path.dirname(os.path.abspath(logfile)), exist_ok=True)
logging.basicConfig(
filename=logfile,
level=logging.INFO,
format="%(asctime)s [%(levelname)s] %(message)s",
)
# -------------------------
# HTML cleanup helpers
# -------------------------
def extract_first_visible_line(text: str) -> str:
"""Remove common HTML and return only the first visible line."""
text = unescape(text or "")
text = re.sub(r"</?(br|div|p)[^>]*>", "\n", text, flags=re.IGNORECASE)
text = re.sub(r"<[^>]+>", "", text)
text = text.strip()
return text.splitlines()[0] if text else ""
def extract_visible_text(text: str) -> str:
"""Remove common HTML and return all visible text as a single string."""
text = unescape(text or "")
text = re.sub(r"</?(br|div|p)[^>]*>", "\n", text, flags=re.IGNORECASE)
text = re.sub(r"<[^>]+>", "", text)
# Normalize whitespace a bit
text = re.sub(r"[ \t]+", " ", text)
text = re.sub(r"\n{2,}", "\n", text)
return text.strip()
def get_notes(query: str) -> List[dict]:
"""
Query Anki for notes and return notesInfo payload.
"""
note_ids = anki_request("findNotes", query=query) or []
if not note_ids:
return []
return anki_request("notesInfo", notes=note_ids) or []
# -------------------------
# Language-specific token rules (spaCy-based)
# -------------------------
JAPANESE_CHAR_RE = re.compile(r"[\p{Script=Hiragana}\p{Script=Katakana}\p{Script=Han}ー]+")
JAPANESE_PARTICLES = {
"", "", "", "", "", "", "", "", "", "から", "まで", "より", "", "なら",
"", "", "", "", "", "", "", "", "", "って", "とき", "ってば", "けど", "けれど",
"しかし", "でも", "ながら", "ほど", "", "もの", "こと", "ところ", "よう", "らしい", "られる",
}
JAPANESE_GRAMMAR_EXCLUDE = {
"", "", "ます", "れる", "てる", "", "", "しまう", "いる", "ない", "なる", "ある", "", "です",
}
JAPANESE_ALLOWED_POS = {"NOUN", "PROPN", "VERB", "ADJ"}
def japanese_filter(token) -> bool:
"""
Filter Japanese tokens to keep “content-ish” words and avoid particles/grammar glue.
Assumes a Japanese spaCy model that provides lemma_ and pos_ reasonably.
"""
text = (token.text or "").strip()
lemma = (token.lemma_ or "").strip()
if not text:
return False
# Must look like Japanese script (hiragana/katakana/kanji/ー)
if not JAPANESE_CHAR_RE.fullmatch(text):
return False
# Drop obvious grammar / particles
if lemma in JAPANESE_GRAMMAR_EXCLUDE or text in JAPANESE_PARTICLES:
return False
# Keep only selected parts of speech
if getattr(token, "pos_", None) not in JAPANESE_ALLOWED_POS:
return False
# Drop URLs/emails/stopwords when model flags them
if getattr(token, "is_stop", False) or getattr(token, "like_url", False) or getattr(token, "like_email", False):
return False
# Defensive: drop tokens that look like HTML fragments or garbage
if any(c in text for c in "<>=/\\:&%"):
return False
if text in {"ruby", "rt", "div", "br", "nbsp", "href", "strong", "a"}:
return False
return True
def spanish_filter(token) -> bool:
"""
Keep alpha tokens that are not stopwords. (spaCy handles accent marks fine here.)
"""
return bool(getattr(token, "is_alpha", False)) and not bool(getattr(token, "is_stop", False))
def spanish_format(token) -> str:
return (token.lemma_ or token.text or "").lower().strip()
def japanese_format(token) -> str:
# Keep both lemma and surface form (useful when lemma normalization is aggressive)
lemma = (token.lemma_ or "").strip()
surface = (token.text or "").strip()
if not lemma and not surface:
return ""
if lemma and surface and lemma != surface:
return f"{lemma} ({surface})"
return lemma or surface
LANGUAGE_PROFILES = {
"spanish": {
"spacy_model": "es_core_news_sm",
"token_filter": spanish_filter,
"output_format": spanish_format,
},
"japanese": {
"spacy_model": "ja_core_news_lg",
"token_filter": japanese_filter,
"output_format": japanese_format,
},
}
def load_spacy_model(model_name: str):
"""
Import spaCy lazily and load a model.
This lets us show clearer errors when spaCy is missing/broken in the environment.
"""
try:
import spacy # type: ignore
except Exception as e:
raise RuntimeError(
"Failed to import spaCy. If you're on Python 3.14, spaCy may not be compatible yet.\n"
"Use a Python 3.12 venv for this script."
) from e
try:
return spacy.load(model_name)
except Exception as e:
raise RuntimeError(
f"Failed to load spaCy model '{model_name}'.\n"
f"Try: python -m spacy download {model_name}"
) from e
# -------------------------
# Core extraction
# -------------------------
def extract_counts(
notes: List[dict],
field_name: str,
nlp,
token_filter: Callable,
output_format: Callable,
use_full_field: bool,
) -> Counter:
"""
For each note, take the specified field, strip HTML, tokenize, and count.
"""
counter: Counter = Counter()
for note in notes:
fields = note.get("fields", {}) or {}
raw_val = (fields.get(field_name, {}) or {}).get("value", "") or ""
text = extract_visible_text(raw_val) if use_full_field else extract_first_visible_line(raw_val)
if not text:
continue
doc = nlp(text)
for token in doc:
if token_filter(token):
key = output_format(token)
if key:
counter[key] += 1
return counter
def write_counts(counter: Counter, out_path: str, min_freq: int) -> int:
"""
Write "token count" lines sorted by descending count.
Returns the number of written entries.
"""
items = [(w, c) for (w, c) in counter.items() if c >= min_freq]
items.sort(key=lambda x: (-x[1], x[0]))
os.makedirs(os.path.dirname(os.path.abspath(out_path)), exist_ok=True)
with open(out_path, "w", encoding="utf-8") as f:
for word, freq in items:
f.write(f"{word} {freq}\n")
return len(items)
def build_query_from_decks(decks: List[str]) -> str:
"""
Build an Anki query that OR's multiple deck:"..." clauses.
"""
# deck:"日本語" OR deck:"日本語::subdeck" is possible but we keep it simple.
parts = [f'deck:"{d}"' for d in decks]
return " OR ".join(parts)
# -------------------------
# Main CLI
# -------------------------
def main() -> int:
parser = argparse.ArgumentParser(
description="Extract frequent words from Anki notes (CLI resembles other toolkit scripts)."
)
# Match "positional lang” style (jp/es)
parser.add_argument("lang", choices=sorted(LANG_MAP.keys()), help="Language code (jp or es).")
# Let you override deck selection, but keep sane defaults:
# - if --query is provided, we use that exactly
# - else if --deck is provided (repeatable), we use those decks
# - else we infer decks from DECK_TO_LANGUAGE mapping
group = parser.add_mutually_exclusive_group()
group.add_argument(
"--query",
help='Full Anki search query (e.g. \'deck:"Español" tag:foo\'). Overrides --deck.',
)
group.add_argument(
"--deck",
action="append",
help='Deck name (repeatable). Example: --deck "日本語" --deck "日本語::Subdeck"',
)
# Similar “bashy” knobs
parser.add_argument("--field", default="Back", help="Which note field to read (default: Back).")
parser.add_argument("--min-freq", type=int, default=2, help="Minimum frequency to include (default: 2).")
parser.add_argument("--outdir", help="Output directory (default: ~/Languages/Anki/anki-words/<language>).")
parser.add_argument("--out", help="Output file path (default: <outdir>/words_<lang>.txt).")
parser.add_argument(
"--full-field",
action="store_true",
help="Use the full field text (HTML stripped) instead of only the first visible line.",
)
parser.add_argument(
"--spacy-model",
help="Override the spaCy model name (advanced).",
)
parser.add_argument(
"--logfile",
default=os.path.join(DEFAULT_WORD_OUTPUT_ROOT, "extract_words.log"),
help="Log file path.",
)
args = parser.parse_args()
setup_logging(args.logfile)
language_bucket = LANG_MAP[args.lang]
profile = LANGUAGE_PROFILES.get(language_bucket)
if not profile:
print(f"❌ Unsupported language bucket: {language_bucket}", file=sys.stderr)
return 1
# Resolve query / decks
if args.query:
query = args.query
else:
if args.deck:
decks = args.deck
else:
decks = [d for d, lang in DECK_TO_LANGUAGE.items() if lang == language_bucket]
if not decks:
print(f"❌ No decks mapped for language: {language_bucket}", file=sys.stderr)
return 1
query = build_query_from_decks(decks)
# Output paths
out_dir = os.path.expanduser(args.outdir) if args.outdir else os.path.join(DEFAULT_WORD_OUTPUT_ROOT, language_bucket)
default_outfile = os.path.join(out_dir, f"words_{args.lang}.txt")
out_path = os.path.expanduser(args.out) if args.out else default_outfile
logging.info("lang=%s bucket=%s query=%s field=%s", args.lang, language_bucket, query, args.field)
print(f"🔎 Query: {query}")
print(f"🧾 Field: {args.field}")
# Load spaCy model
model_name = args.spacy_model or profile["spacy_model"]
try:
nlp = load_spacy_model(model_name)
except Exception as e:
print(f"{e}", file=sys.stderr)
logging.exception("spaCy load failed")
return 1
# Fetch notes
try:
notes = get_notes(query)
except Exception as e:
print(f"❌ Failed to query AnkiConnect: {e}", file=sys.stderr)
logging.exception("AnkiConnect query failed")
return 1
print(f"✅ Found {len(notes)} notes.")
if not notes:
print("⚠️ No notes found. Check your query/deck names.")
return 0
# Validate the field exists on at least one note
fields0 = (notes[0].get("fields", {}) or {})
if args.field not in fields0:
available = list(fields0.keys())
print(f"❌ Field '{args.field}' not found on sample note.", file=sys.stderr)
print(f" Available fields: {available}", file=sys.stderr)
return 1
# Extract + write
counter = extract_counts(
notes=notes,
field_name=args.field,
nlp=nlp,
token_filter=profile["token_filter"],
output_format=profile["output_format"],
use_full_field=args.full_field,
)
print(f"🧠 Extracted {len(counter)} unique entries (before min-freq filter).")
written = write_counts(counter, out_path, args.min_freq)
print(f"📄 Wrote {written} entries to: {out_path}")
logging.info("wrote=%s out=%s", written, out_path)
return 0
if __name__ == "__main__":
raise SystemExit(main())