initial commit

This commit is contained in:
Pawel
2026-05-21 07:21:13 -04:00
commit d510d07ed9
50 changed files with 5359 additions and 0 deletions

View File

@@ -0,0 +1,3 @@
"""ClimbingBoardGPT: unified TB2/Kilter route modeling."""
__version__ = "0.2.0"

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -0,0 +1,150 @@
"""
Board configuration management for ClimbingBoardGPT.
This module handles loading and parsing board-specific configuration from
JSON files. Each board (TB2, Kilter) has different:
- Layout IDs
- Role ID mappings (start/middle/finish/foot)
- Angle cutoffs
- Database paths
- Token prefixes
The config-driven approach means adding a new board only requires
creating a new JSON file, not modifying code.
"""
from __future__ import annotations
import json
from dataclasses import dataclass
from pathlib import Path
from .paths import find_project_root
@dataclass(frozen=True)
class BoardConfig:
"""Configuration for a single climbing board.
This dataclass stores all board-specific settings needed for
data loading, tokenization, and model training.
Attributes:
board_key: Short identifier (e.g., "tb2", "kilter")
display_name: Human-readable name (e.g., "Tension Board 2 Mirror")
token_prefix: Namespace for hold tokens (e.g., "TB2", "KILTER")
db_path: Path to the SQLite database
layout_id: Which layout in the database to use
max_angle: Filter out routes steeper than this (None = no filter)
min_fa_date: Filter out routes first ascended before this date
placement_y_max: Filter out placements above this Y coordinate
include_mirror_placement_id: Whether to include mirror info (TB2 only)
role_definitions: Maps semantic role names to numeric IDs
boardlib_database_command: Command to download the database
boardlib_images_command: Command to download board images
notes: Additional notes about the configuration
"""
board_key: str
display_name: str
token_prefix: str
db_path: Path
layout_id: int
max_angle: float | None
min_fa_date: str | None
placement_y_max: float | None
include_mirror_placement_id: bool
role_definitions: dict[str, int]
boardlib_database_command: str | None = None
boardlib_images_command: str | None = None
notes: tuple[str, ...] = ()
@property
def role_id_to_name(self) -> dict[int, str]:
"""Reverse mapping from numeric role IDs to semantic role names.
Example: {5: 'start', 6: 'middle', 7: 'finish', 8: 'foot'} for TB2
"""
return {int(role_id): name for name, role_id in self.role_definitions.items()}
@property
def board_token(self) -> str:
"""The special token representing this board.
Example: "<BOARD_TB2>" or "<BOARD_KILTER>"
"""
return f"<BOARD_{self.token_prefix}>"
def resolve_db_path(self, project_root: Path | None = None) -> Path:
"""Resolve the database path relative to the project root.
If db_path is absolute, return it as-is.
Otherwise, resolve it relative to the project root.
"""
project_root = project_root or find_project_root()
return self.db_path if self.db_path.is_absolute() else project_root / self.db_path
def load_board_config(board_key: str, config_dir: str | Path | None = None) -> BoardConfig:
"""Load a single board configuration from a JSON file.
Args:
board_key: Board identifier (e.g., "tb2", "kilter")
config_dir: Directory containing config JSON files
Returns:
BoardConfig dataclass with all board settings
Raises:
FileNotFoundError: If the config file doesn't exist
"""
project_root = find_project_root()
config_dir = Path(config_dir) if config_dir is not None else project_root / "configs"
path = config_dir / f"{board_key}.json"
if not path.exists():
available = sorted(p.stem for p in config_dir.glob("*.json"))
raise FileNotFoundError(
f"Unknown board config '{board_key}'. Available: {available}"
)
payload = json.loads(path.read_text(encoding="utf-8"))
return BoardConfig(
board_key=str(payload["board_key"]),
display_name=str(payload["display_name"]),
token_prefix=str(payload["token_prefix"]),
db_path=Path(payload["db_path"]),
layout_id=int(payload["layout_id"]),
max_angle=None if payload.get("max_angle") is None else float(payload["max_angle"]),
min_fa_date=payload.get("min_fa_date"),
placement_y_max=None if payload.get("placement_y_max") is None else float(payload["placement_y_max"]),
include_mirror_placement_id=bool(payload.get("include_mirror_placement_id", False)),
role_definitions={str(k): int(v) for k, v in payload["role_definitions"].items()},
boardlib_database_command=payload.get("boardlib_database_command"),
boardlib_images_command=payload.get("boardlib_images_command"),
notes=tuple(payload.get("notes", [])),
)
def load_board_configs(board_keys: list[str] | tuple[str, ...]) -> list[BoardConfig]:
"""Load multiple board configurations.
Args:
board_keys: List of board identifiers
Returns:
List of BoardConfig dataclasses
"""
return [load_board_config(board_key) for board_key in board_keys]
def parse_board_keys(value: str | None, default: tuple[str, ...] = ("tb2", "kilter")) -> list[str]:
"""Parse a comma-separated string of board keys.
Args:
value: Comma-separated string (e.g., "tb2,kilter") or None
default: Default board keys if value is None or empty
Returns:
List of board key strings
"""
if value is None or not value.strip():
return list(default)
return [part.strip() for part in value.split(",") if part.strip()]

View File

@@ -0,0 +1,202 @@
"""
Database loading for ClimbingBoardGPT.
This module queries SQLite databases for climb and placement data,
applying board-specific filters defined in the configuration.
"""
from __future__ import annotations
import sqlite3
from pathlib import Path
import pandas as pd
from .config import BoardConfig
from .paths import find_project_root
def build_climbs_query(config: BoardConfig) -> tuple[str, list]:
"""Build a SQL query for climbs data with board-specific filters.
The query joins climbs, layouts, products, climb_stats, and difficulty_grades
tables, applying filters for:
- layout_id: Which board layout to use
- max_angle: Exclude routes steeper than this
- min_fa_date: Exclude routes first ascended before this date
- display_difficulty IS NOT NULL: Only routes with difficulty ratings
- is_listed = 1: Only publicly listed routes
Args:
config: Board configuration
Returns:
Tuple of (SQL query string, list of query parameters)
"""
conditions = [
"cs.display_difficulty IS NOT NULL",
"c.is_listed = 1",
"c.layout_id = ?",
]
params: list = [config.layout_id]
if config.max_angle is not None:
conditions.append("cs.angle <= ?")
params.append(config.max_angle)
if config.min_fa_date is not None:
conditions.append("cs.fa_at > ?")
params.append(config.min_fa_date)
query = f"""
SELECT
c.uuid,
c.name AS climb_name,
c.setter_username,
c.layout_id AS layout_id,
c.description,
c.is_nomatch,
c.is_listed,
l.name AS layout_name,
p.name AS board_name,
c.frames,
cs.angle,
cs.display_difficulty,
dg.boulder_name AS boulder_grade,
cs.ascensionist_count,
cs.quality_average,
cs.fa_at
FROM climbs c
JOIN layouts l ON c.layout_id = l.id
JOIN products p ON l.product_id = p.id
JOIN climb_stats cs ON c.uuid = cs.climb_uuid
JOIN difficulty_grades dg ON ROUND(cs.display_difficulty) = dg.difficulty
WHERE {' AND '.join(conditions)}
"""
return query, params
def build_placements_query(config: BoardConfig) -> tuple[str, list]:
"""Build a SQL query for placement data with board-specific filters.
The query retrieves hold positions, default roles, material types,
and (optionally) mirror placement IDs for symmetric holds.
Args:
config: Board configuration
Returns:
Tuple of (SQL query string, list of query parameters)
"""
params: list = [config.layout_id]
y_condition = ""
if config.placement_y_max is not None:
y_condition = " AND h.y <= ?"
params.append(config.placement_y_max)
if config.include_mirror_placement_id:
# TB2 has mirrored holds — include the mirror placement ID
query = f"""
SELECT
p.id AS placement_id,
h.x,
h.y,
p.default_placement_role_id AS default_role_id,
p.set_id AS set_id,
s.name AS set_name,
p_mirror.id AS mirror_placement_id
FROM placements p
JOIN holes h ON p.hole_id = h.id
JOIN sets s ON p.set_id = s.id
LEFT JOIN holes h_mirror ON h.mirrored_hole_id = h_mirror.id
LEFT JOIN placements p_mirror
ON p_mirror.hole_id = h_mirror.id
AND p_mirror.layout_id = p.layout_id
WHERE p.layout_id = ?{y_condition}
"""
else:
# Kilter doesn't have mirrored holds
query = f"""
SELECT
p.id AS placement_id,
h.x,
h.y,
p.default_placement_role_id AS default_role_id,
p.set_id AS set_id,
s.name AS set_name,
NULL AS mirror_placement_id
FROM placements p
JOIN holes h ON p.hole_id = h.id
JOIN sets s ON p.set_id = s.id
WHERE p.layout_id = ?{y_condition}
"""
return query, params
def load_board_data(
config: BoardConfig,
project_root: str | Path | None = None,
) -> tuple[pd.DataFrame, pd.DataFrame]:
"""Load climbs and placements data for a single board.
Args:
config: Board configuration
project_root: Path to project root (for resolving db_path)
Returns:
Tuple of (climbs DataFrame, placements DataFrame)
"""
project_root = Path(project_root) if project_root is not None else find_project_root()
db_path = config.resolve_db_path(project_root)
if not db_path.exists():
raise FileNotFoundError(
f"Could not find database for board '{config.board_key}': {db_path}"
)
climbs_query, climbs_params = build_climbs_query(config)
placements_query, placements_params = build_placements_query(config)
with sqlite3.connect(db_path) as conn:
df_climbs = pd.read_sql_query(climbs_query, conn, params=climbs_params)
df_placements = pd.read_sql_query(placements_query, conn, params=placements_params)
# Add board identifiers for multi-board processing
df_climbs["board_key"] = config.board_key
df_climbs["board_token_prefix"] = config.token_prefix
df_climbs["board_display_name"] = config.display_name
df_placements["board_key"] = config.board_key
df_placements["board_token_prefix"] = config.token_prefix
df_placements["board_display_name"] = config.display_name
return df_climbs, df_placements
def load_multi_board_data(
configs: list[BoardConfig],
project_root: str | Path | None = None,
) -> tuple[pd.DataFrame, pd.DataFrame]:
"""Load and concatenate data from multiple boards.
This function loads data from each board's database and concatenates
them into unified DataFrames. Board identifiers are preserved in
the board_key column.
Args:
configs: List of board configurations
project_root: Path to project root
Returns:
Tuple of (combined climbs DataFrame, combined placements DataFrame)
"""
climb_frames = []
placement_frames = []
for config in configs:
climbs, placements = load_board_data(config, project_root=project_root)
climb_frames.append(climbs)
placement_frames.append(placements)
return (
pd.concat(climb_frames, ignore_index=True),
pd.concat(placement_frames, ignore_index=True),
)

View File

@@ -0,0 +1,53 @@
from __future__ import annotations
import torch
from torch.utils.data import Dataset
class RouteGradeDataset(Dataset):
def __init__(self, df, max_len: int, pad_id: int):
self.ids = df["model_ids"].tolist()
self.targets = df["display_difficulty"].astype(float).values
self.uuids = df["uuid"].tolist()
self.boards = df["board_key"].astype(str).tolist()
self.max_len = int(max_len)
self.pad_id = int(pad_id)
def __len__(self) -> int:
return len(self.ids)
def __getitem__(self, idx: int):
ids = list(self.ids[idx])[: self.max_len]
mask = [1] * len(ids)
if len(ids) < self.max_len:
pad_n = self.max_len - len(ids)
ids += [self.pad_id] * pad_n
mask += [0] * pad_n
return {
"input_ids": torch.tensor(ids, dtype=torch.long),
"attention_mask": torch.tensor(mask, dtype=torch.bool),
"target": torch.tensor(self.targets[idx], dtype=torch.float32),
"uuid": self.uuids[idx],
"board_key": self.boards[idx],
}
class RouteGPTDataset(Dataset):
def __init__(self, df, max_len: int, pad_id: int):
self.ids = df["gpt_ids"].tolist()
self.max_len = int(max_len)
self.pad_id = int(pad_id)
def __len__(self) -> int:
return len(self.ids)
def __getitem__(self, idx: int):
ids = list(self.ids[idx])[: self.max_len]
if len(ids) < self.max_len:
ids += [self.pad_id] * (self.max_len - len(ids))
return {
"input_ids": torch.tensor(ids[:-1], dtype=torch.long),
"target_ids": torch.tensor(ids[1:], dtype=torch.long),
}

View File

@@ -0,0 +1,195 @@
from __future__ import annotations
import ast
import re
from typing import Iterable
import numpy as np
import pandas as pd
from scipy.spatial.distance import pdist
HOLD_TOKEN_PATTERN = re.compile(r"^<([A-Z0-9_]+)_p(\d+)_(start|middle|finish|foot|unknown)>$")
def parse_token_list(value) -> list[str]:
if isinstance(value, list):
return value
if not isinstance(value, str):
return []
try:
parsed = ast.literal_eval(value)
if isinstance(parsed, list):
return parsed
except Exception:
pass
return value.split()
def tokens_to_hold_records(tokens: Iterable[str]) -> list[dict[str, object]]:
rows = []
for token in tokens:
match = HOLD_TOKEN_PATTERN.match(token)
if match is None:
continue
rows.append(
{
"token": token,
"board_token_prefix": match.group(1),
"placement_id": int(match.group(2)),
"role": match.group(3),
}
)
return rows
def validity_from_records(records: list[dict[str, object]]) -> dict[str, object]:
placements = [int(record["placement_id"]) for record in records]
roles = [str(record["role"]) for record in records]
prefixes = [str(record["board_token_prefix"]) for record in records]
one_board_only = len(set(prefixes)) <= 1
out = {
"n_holds_eval": len(records),
"n_unique_placements_eval": len(set(placements)),
"has_duplicate_placements_eval": len(records) != len(set(placements)),
"one_board_only_eval": one_board_only,
"n_start_eval": roles.count("start"),
"n_middle_eval": roles.count("middle"),
"n_foot_eval": roles.count("foot"),
"n_finish_eval": roles.count("finish"),
"has_start_eval": "start" in roles,
"has_middle_eval": "middle" in roles,
"has_finish_eval": "finish" in roles,
}
out["basic_valid_eval"] = (
one_board_only
and out["n_holds_eval"] >= 3
and out["n_holds_eval"] == out["n_unique_placements_eval"]
and out["has_start_eval"]
and out["has_finish_eval"]
)
out["strict_valid_eval"] = (
out["basic_valid_eval"]
and out["has_middle_eval"]
and out["n_holds_eval"] >= 4
)
return out
def frames_to_holds(frames: str | None) -> list[tuple[int, int]]:
if not isinstance(frames, str):
return []
return [(int(p), int(r)) for p, r in re.findall(r"p(\d+)r(\d+)", frames)]
def holds_to_placement_set(holds: Iterable[tuple[int, int]]) -> frozenset[int]:
return frozenset(int(placement_id) for placement_id, _ in holds)
def jaccard(a: frozenset[int], b: frozenset[int]) -> float:
if not a and not b:
return 1.0
if not a or not b:
return 0.0
return len(a & b) / len(a | b)
def nearest_real_route_same_board(
generated_set: frozenset[int],
generated_board_key: str,
real_df: pd.DataFrame,
) -> dict[str, object]:
board_frame = real_df[real_df["board_key"] == generated_board_key]
best = {
"nearest_real_jaccard": -1.0,
"nearest_real_uuid": None,
"nearest_real_name": None,
"nearest_real_grouped_v": None,
"nearest_real_angle": None,
}
for _, row in board_frame.iterrows():
similarity = jaccard(generated_set, row["hold_set"])
if similarity > best["nearest_real_jaccard"]:
best.update(
{
"nearest_real_jaccard": similarity,
"nearest_real_uuid": row["uuid"],
"nearest_real_name": row["climb_name"],
"nearest_real_grouped_v": row["grouped_v"],
"nearest_real_angle": row["angle"],
}
)
best["novelty_distance"] = 1.0 - float(best["nearest_real_jaccard"])
return best
def build_placement_coords(df_token_meta: pd.DataFrame) -> dict[tuple[str, int], dict[str, float]]:
hold_meta = df_token_meta[df_token_meta["kind"] == "hold"].dropna(subset=["placement_id"]).copy()
coords = {}
for _, row in hold_meta.drop_duplicates(["board_key", "placement_id"]).iterrows():
key = (str(row["board_key"]), int(row["placement_id"]))
coords[key] = {
"x": float(row["x"]),
"y": float(row["y"]),
}
return coords
def simple_route_features(
board_key: str,
records: list[dict[str, object]],
placement_coords: dict[tuple[str, int], dict[str, float]],
) -> dict[str, float]:
rows = []
for record in records:
key = (str(board_key), int(record["placement_id"]))
coord = placement_coords.get(key)
if coord is None:
continue
x = float(coord["x"])
y = float(coord["y"])
if np.isnan(x) or np.isnan(y):
continue
role = str(record["role"])
rows.append(
{
"x": x,
"y": y,
"role": role,
"is_hand": role in {"start", "middle", "finish"},
"is_foot": role == "foot",
}
)
if not rows:
return {
"geom_n_holds": 0.0,
"geom_height": np.nan,
"geom_width": np.nan,
"geom_mean_y": np.nan,
"geom_mean_x_abs": np.nan,
"geom_mean_hand_reach": np.nan,
"geom_max_hand_reach": np.nan,
}
d = pd.DataFrame(rows)
out = {
"geom_n_holds": float(len(d)),
"geom_height": float(d["y"].max() - d["y"].min()),
"geom_width": float(d["x"].max() - d["x"].min()),
"geom_mean_y": float(d["y"].mean()),
"geom_mean_x_abs": float(d["x"].abs().mean()),
}
hands = d[d["is_hand"]].sort_values(["y", "x"])
if len(hands) >= 2:
distances = pdist(hands[["x", "y"]].values)
out["geom_mean_hand_reach"] = float(distances.mean())
out["geom_max_hand_reach"] = float(distances.max())
else:
out["geom_mean_hand_reach"] = np.nan
out["geom_max_hand_reach"] = np.nan
return out

View File

@@ -0,0 +1,169 @@
from __future__ import annotations
import re
from typing import Iterable
import torch
import torch.nn.functional as F
HOLD_TOKEN_PATTERN = re.compile(r"^<([A-Z0-9_]+)_p(\d+)_(start|middle|finish|foot|unknown)>$")
def top_k_filter(logits: torch.Tensor, k: int | None) -> torch.Tensor:
if k is None or k <= 0 or k >= logits.size(-1):
return logits
values, _ = torch.topk(logits, k)
cutoff = values[:, [-1]]
return torch.where(logits < cutoff, torch.full_like(logits, -float("inf")), logits)
@torch.no_grad()
def sample_ids(
model,
prompt_ids: list[int],
device: torch.device,
max_new_tokens: int = 40,
temperature: float = 0.9,
top_k: int | None = 50,
eos_id: int | None = None,
forbidden_ids: Iterable[int] | None = None,
) -> list[int]:
model.eval()
sequence = torch.tensor([prompt_ids], dtype=torch.long, device=device)
forbidden_ids = set(forbidden_ids or [])
for _ in range(max_new_tokens):
idx_cond = sequence[:, -model.block_size :]
logits, _ = model(idx_cond)
logits = logits[:, -1, :] / max(temperature, 1e-6)
for token_id in forbidden_ids:
logits[:, int(token_id)] = -float("inf")
logits = top_k_filter(logits, top_k)
probs = F.softmax(logits, dim=-1)
next_id = torch.multinomial(probs, num_samples=1)
sequence = torch.cat([sequence, next_id], dim=1)
if eos_id is not None and int(next_id.item()) == int(eos_id):
break
return sequence[0].detach().cpu().tolist()
def prompt_tokens(board_prefix: str, angle: int, grouped_v: int) -> list[str]:
return [
"<BOS>",
f"<BOARD_{board_prefix}>",
f"<ANGLE_{int(angle)}>",
f"<GRADE_V{int(grouped_v)}>",
]
def hold_records(tokens: Iterable[str]) -> list[dict[str, object]]:
rows = []
for token in tokens:
match = HOLD_TOKEN_PATTERN.match(token)
if match is None:
continue
rows.append(
{
"board_prefix": match.group(1),
"placement_id": int(match.group(2)),
"role": match.group(3),
"token": token,
}
)
return rows
def validity_summary(tokens: Iterable[str]) -> dict[str, object]:
records = hold_records(tokens)
placements = [record["placement_id"] for record in records]
roles = [record["role"] for record in records]
prefixes = [record["board_prefix"] for record in records]
one_board_only = len(set(prefixes)) <= 1
no_duplicates = len(placements) == len(set(placements))
has_start = "start" in roles
has_finish = "finish" in roles
enough_holds = len(records) >= 3
return {
"n_hold_tokens": len(records),
"n_unique_placements": len(set(placements)),
"has_duplicate_placements": not no_duplicates,
"one_board_only": one_board_only,
"has_start": has_start,
"has_middle": "middle" in roles,
"has_finish": has_finish,
"n_start": roles.count("start"),
"n_middle": roles.count("middle"),
"n_foot": roles.count("foot"),
"n_finish": roles.count("finish"),
"basic_valid": bool(one_board_only and no_duplicates and has_start and has_finish and enough_holds),
}
def generated_tokens_to_frames(tokens: Iterable[str], role_name_to_id: dict[str, int]) -> str:
pieces = []
seen = set()
for record in hold_records(tokens):
placement_id = int(record["placement_id"])
role = str(record["role"])
if placement_id in seen or role not in role_name_to_id:
continue
seen.add(placement_id)
pieces.append(f"p{placement_id}r{int(role_name_to_id[role])}")
return "".join(pieces)
def generate_one(
model,
stoi: dict[str, int],
itos: dict[int, str],
device: torch.device,
board_prefix: str,
angle: int,
grouped_v: int,
role_name_to_id: dict[str, int],
temperature: float = 0.9,
top_k: int | None = 50,
max_new_tokens: int = 40,
) -> dict[str, object]:
unk_id = stoi["<UNK>"]
eos_id = stoi["<EOS>"]
forbidden_ids = [
stoi["<PAD>"],
stoi["<UNK>"],
stoi["<BOS>"],
stoi["<CLS>"],
stoi["<MASK>"],
]
prompt = prompt_tokens(board_prefix, angle, grouped_v)
prompt_ids = [stoi.get(token, unk_id) for token in prompt]
token_ids = sample_ids(
model=model,
prompt_ids=prompt_ids,
device=device,
max_new_tokens=max_new_tokens,
temperature=temperature,
top_k=top_k,
eos_id=eos_id,
forbidden_ids=forbidden_ids,
)
tokens = [itos.get(int(idx), "<UNK>") for idx in token_ids]
validity = validity_summary(tokens)
return {
"requested_board_prefix": board_prefix,
"requested_angle": int(angle),
"requested_grouped_v": int(grouped_v),
"temperature": float(temperature),
"top_k": None if top_k is None else int(top_k),
"tokens": tokens,
"sequence": " ".join(tokens),
"frames": generated_tokens_to_frames(tokens, role_name_to_id),
**validity,
}

View File

@@ -0,0 +1,31 @@
from __future__ import annotations
GRADE_TO_V = {
10: 0, 11: 0, 12: 0,
13: 1, 14: 1,
15: 2,
16: 3, 17: 3,
18: 4, 19: 4,
20: 5, 21: 5,
22: 6,
23: 7,
24: 8, 25: 8,
26: 9,
27: 10,
28: 11,
29: 12,
30: 13,
31: 14,
32: 15,
33: 16,
}
def to_grouped_v(display_difficulty: float) -> int:
rounded = int(round(float(display_difficulty)))
rounded = max(min(rounded, max(GRADE_TO_V)), min(GRADE_TO_V))
return GRADE_TO_V[rounded]
def grade_token(display_difficulty: float) -> str:
return f"<GRADE_V{to_grouped_v(display_difficulty)}>"

View File

@@ -0,0 +1,46 @@
from __future__ import annotations
import math
import numpy as np
import pandas as pd
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from .grades import to_grouped_v
def regression_metrics(y_true, y_pred) -> dict[str, float]:
y_true = np.asarray(y_true)
y_pred = np.asarray(y_pred)
true_v = np.asarray([to_grouped_v(x) for x in y_true])
pred_v = np.asarray([to_grouped_v(x) for x in y_pred])
return {
"mae": float(mean_absolute_error(y_true, y_pred)),
"rmse": float(math.sqrt(mean_squared_error(y_true, y_pred))),
"r2": float(r2_score(y_true, y_pred)),
"within_1_difficulty": float(np.mean(np.abs(y_true - y_pred) <= 1) * 100),
"within_2_difficulty": float(np.mean(np.abs(y_true - y_pred) <= 2) * 100),
"exact_grouped_v": float(np.mean(true_v == pred_v) * 100),
"within_1_vgrade": float(np.mean(np.abs(true_v - pred_v) <= 1) * 100),
"within_2_vgrades": float(np.mean(np.abs(true_v - pred_v) <= 2) * 100),
}
def metrics_by_board(pred_df: pd.DataFrame) -> pd.DataFrame:
rows = []
for board_key, frame in pred_df.groupby("board_key"):
metrics = regression_metrics(frame["y_true"].values, frame["y_pred"].values)
rows.append({"board_key": board_key, **metrics})
return pd.DataFrame(rows)
def print_metrics(name: str, metrics: dict[str, float]) -> None:
print(name)
print("-" * len(name))
for key, value in metrics.items():
suffix = "%" if "within" in key or "exact" in key else ""
if suffix:
print(f"{key:24s}: {value:8.2f}{suffix}")
else:
print(f"{key:24s}: {value:8.4f}")

View File

@@ -0,0 +1,139 @@
from __future__ import annotations
import torch
import torch.nn as nn
import torch.nn.functional as F
class JointRouteTransformerRegressor(nn.Module):
"""Transformer encoder for joint TB2/Kilter route difficulty prediction."""
def __init__(
self,
vocab_size: int,
max_len: int,
coord_features: torch.Tensor,
d_model: int = 128,
nhead: int = 4,
num_layers: int = 4,
dim_feedforward: int = 256,
dropout: float = 0.10,
pad_id: int = 0,
):
super().__init__()
self.vocab_size = vocab_size
self.max_len = max_len
self.d_model = d_model
self.pad_id = pad_id
self.token_emb = nn.Embedding(vocab_size, d_model, padding_idx=pad_id)
self.pos_emb = nn.Embedding(max_len, d_model)
self.register_buffer("coord_features", coord_features.clone().float())
self.coord_proj = nn.Linear(coord_features.shape[1], d_model)
encoder_layer = nn.TransformerEncoderLayer(
d_model=d_model,
nhead=nhead,
dim_feedforward=dim_feedforward,
dropout=dropout,
activation="gelu",
batch_first=True,
norm_first=True,
)
self.encoder = nn.TransformerEncoder(encoder_layer, num_layers=num_layers)
self.norm = nn.LayerNorm(d_model)
self.head = nn.Sequential(
nn.Linear(d_model, d_model),
nn.GELU(),
nn.Dropout(dropout),
nn.Linear(d_model, 1),
)
def forward(self, input_ids: torch.Tensor, attention_mask: torch.Tensor) -> torch.Tensor:
batch_size, seq_len = input_ids.shape
positions = torch.arange(seq_len, device=input_ids.device).unsqueeze(0).expand(batch_size, seq_len)
x = self.token_emb(input_ids) + self.pos_emb(positions)
x = x + self.coord_proj(self.coord_features[input_ids])
key_padding_mask = ~attention_mask.bool()
h = self.encoder(x, src_key_padding_mask=key_padding_mask)
h = self.norm(h)
cls_state = h[:, 0, :]
return self.head(cls_state).squeeze(-1)
class JointRouteGPT(nn.Module):
"""Tiny GPT-style causal transformer for board-conditioned route generation."""
def __init__(
self,
vocab_size: int,
block_size: int,
n_embd: int = 128,
n_head: int = 4,
n_layer: int = 4,
dropout: float = 0.10,
pad_id: int = 0,
):
super().__init__()
self.vocab_size = vocab_size
self.block_size = block_size
self.pad_id = pad_id
self.token_emb = nn.Embedding(vocab_size, n_embd, padding_idx=pad_id)
self.pos_emb = nn.Embedding(block_size, n_embd)
self.drop = nn.Dropout(dropout)
layer = nn.TransformerEncoderLayer(
d_model=n_embd,
nhead=n_head,
dim_feedforward=4 * n_embd,
dropout=dropout,
activation="gelu",
batch_first=True,
norm_first=True,
)
self.blocks = nn.TransformerEncoder(layer, num_layers=n_layer)
self.ln_f = nn.LayerNorm(n_embd)
self.lm_head = nn.Linear(n_embd, vocab_size, bias=False)
self.lm_head.weight = self.token_emb.weight
def forward(
self,
idx: torch.Tensor,
targets: torch.Tensor | None = None,
) -> tuple[torch.Tensor, torch.Tensor | None]:
_, seq_len = idx.shape
if seq_len > self.block_size:
idx = idx[:, -self.block_size :]
seq_len = idx.shape[1]
positions = torch.arange(seq_len, device=idx.device).unsqueeze(0)
x = self.drop(self.token_emb(idx) + self.pos_emb(positions))
causal_mask = torch.triu(
torch.ones(seq_len, seq_len, device=idx.device, dtype=torch.bool),
diagonal=1,
)
key_padding_mask = idx.eq(self.pad_id)
h = self.blocks(
x,
mask=causal_mask,
src_key_padding_mask=key_padding_mask,
)
h = self.ln_f(h)
logits = self.lm_head(h)
loss = None
if targets is not None:
loss = F.cross_entropy(
logits.reshape(-1, logits.size(-1)),
targets.reshape(-1),
ignore_index=self.pad_id,
)
return logits, loss

View File

@@ -0,0 +1,11 @@
from __future__ import annotations
from pathlib import Path
def find_project_root(start: str | Path | None = None) -> Path:
current = Path(start).resolve() if start is not None else Path.cwd().resolve()
for candidate in [current, *current.parents]:
if (candidate / "pyproject.toml").exists() and (candidate / "configs").exists():
return candidate
return current

View File

@@ -0,0 +1,331 @@
from __future__ import annotations
import re
from typing import Iterable
import numpy as np
import pandas as pd
from .config import BoardConfig
from .grades import GRADE_TO_V, grade_token, to_grouped_v
SPECIAL_TOKENS = [
"<PAD>",
"<UNK>",
"<BOS>",
"<EOS>",
"<CLS>",
"<MASK>",
]
ANGLE_TOKEN_PATTERN = re.compile(r"^<ANGLE_(-?\d+)>$")
GRADE_TOKEN_PATTERN = re.compile(r"^<GRADE_V(\d+)>$")
BOARD_TOKEN_PATTERN = re.compile(r"^<BOARD_([A-Z0-9_]+)>$")
HOLD_TOKEN_PATTERN = re.compile(r"^<([A-Z0-9_]+)_p(\d+)_(start|middle|finish|foot|unknown)>$")
ROLE_SORT_ORDER = {
"start": 0,
"middle": 1,
"foot": 2,
"finish": 3,
"unknown": 9,
}
def parse_frames(frames_str: str | None) -> list[tuple[int, int]]:
if not isinstance(frames_str, str):
return []
matches = re.findall(r"p(\d+)r(\d+)", frames_str)
return [(int(placement_id), int(role_id)) for placement_id, role_id in matches]
def make_placement_lookup(df_placements: pd.DataFrame) -> dict[tuple[str, int], dict]:
rows = {}
for _, row in df_placements.iterrows():
key = (str(row["board_key"]), int(row["placement_id"]))
rows[key] = row.to_dict()
return rows
def role_name(role_id: int, config: BoardConfig) -> str:
return config.role_id_to_name.get(int(role_id), "unknown")
def placement_xy(
board_key: str,
placement_id: int,
placement_lookup: dict[tuple[str, int], dict],
) -> tuple[float, float]:
row = placement_lookup.get((str(board_key), int(placement_id)))
if row is None:
return (float("nan"), float("nan"))
return (float(row["x"]), float(row["y"]))
def canonicalize_holds(
holds: Iterable[tuple[int, int]],
config: BoardConfig,
placement_lookup: dict[tuple[str, int], dict],
) -> list[tuple[int, int]]:
def key(pair: tuple[int, int]):
placement_id, role_id = pair
x, y = placement_xy(config.board_key, placement_id, placement_lookup)
name = role_name(role_id, config)
return (
ROLE_SORT_ORDER.get(name, 9),
y if not np.isnan(y) else 9999.0,
x if not np.isnan(x) else 9999.0,
placement_id,
)
return sorted(list(holds), key=key)
def board_token(config: BoardConfig) -> str:
return f"<BOARD_{config.token_prefix}>"
def angle_token(angle: float) -> str:
return f"<ANGLE_{int(round(float(angle)))}>"
def hold_token(
placement_id: int,
role_id: int,
config: BoardConfig,
) -> str:
semantic_role = role_name(role_id, config)
return f"<{config.token_prefix}_p{int(placement_id)}_{semantic_role}>"
def tokenize_route(
row,
config: BoardConfig,
placement_lookup: dict[tuple[str, int], dict],
include_grade: bool = True,
canonical: bool = True,
) -> list[str]:
holds = parse_frames(row["frames"])
if canonical:
holds = canonicalize_holds(holds, config, placement_lookup)
tokens = [
"<BOS>",
board_token(config),
angle_token(row["angle"]),
]
if include_grade:
tokens.append(grade_token(row["display_difficulty"]))
tokens.extend(hold_token(placement_id, role_id, config) for placement_id, role_id in holds)
tokens.append("<EOS>")
return tokens
def build_route_records(
df_climbs: pd.DataFrame,
configs_by_key: dict[str, BoardConfig],
placement_lookup: dict[tuple[str, int], dict],
) -> pd.DataFrame:
records: list[dict] = []
for _, row in df_climbs.iterrows():
board_key = str(row["board_key"])
config = configs_by_key[board_key]
holds = canonicalize_holds(parse_frames(row["frames"]), config, placement_lookup)
if not holds:
continue
hold_tokens = [hold_token(p, r, config) for p, r in holds]
semantic_roles = [role_name(r, config) for _, r in holds]
tokens_with_grade = tokenize_route(
row,
config=config,
placement_lookup=placement_lookup,
include_grade=True,
canonical=True,
)
tokens_no_grade = tokenize_route(
row,
config=config,
placement_lookup=placement_lookup,
include_grade=False,
canonical=True,
)
records.append(
{
"uuid": row["uuid"],
"board_key": board_key,
"board_display_name": row["board_display_name"],
"board_token_prefix": row["board_token_prefix"],
"board_token": board_token(config),
"climb_name": row["climb_name"],
"setter_username": row.get("setter_username"),
"layout_id": int(row["layout_id"]),
"layout_name": row.get("layout_name"),
"board_name": row.get("board_name"),
"frames": row["frames"],
"angle": float(row["angle"]),
"display_difficulty": float(row["display_difficulty"]),
"grouped_v": int(to_grouped_v(row["display_difficulty"])),
"boulder_grade": row.get("boulder_grade"),
"ascensionist_count": row.get("ascensionist_count"),
"quality_average": row.get("quality_average"),
"fa_at": row.get("fa_at"),
"n_holds": len(holds),
"n_start": semantic_roles.count("start"),
"n_middle": semantic_roles.count("middle"),
"n_foot": semantic_roles.count("foot"),
"n_finish": semantic_roles.count("finish"),
"holds": holds,
"hold_tokens": hold_tokens,
"tokens_with_grade": tokens_with_grade,
"tokens_no_grade": tokens_no_grade,
"sequence_with_grade": " ".join(tokens_with_grade),
"sequence_no_grade": " ".join(tokens_no_grade),
}
)
return pd.DataFrame(records)
def build_vocab(df_routes: pd.DataFrame) -> tuple[list[str], dict[str, int], dict[int, str]]:
all_tokens: list[str] = []
for tokens in df_routes["tokens_with_grade"]:
all_tokens.extend(tokens)
vocab_tokens = list(SPECIAL_TOKENS)
for token in sorted(set(all_tokens)):
if token not in vocab_tokens:
vocab_tokens.append(token)
stoi = {token: idx for idx, token in enumerate(vocab_tokens)}
itos = {idx: token for token, idx in stoi.items()}
return vocab_tokens, stoi, itos
def encode(tokens: Iterable[str], stoi: dict[str, int]) -> list[int]:
unk_id = stoi["<UNK>"]
return [stoi.get(token, unk_id) for token in tokens]
def decode(ids: Iterable[int], itos: dict[int, str]) -> list[str]:
return [itos.get(int(idx), "<UNK>") for idx in ids]
def build_token_metadata(
vocab_tokens: list[str],
stoi: dict[str, int],
df_placements: pd.DataFrame,
placement_lookup: dict[tuple[str, int], dict],
configs_by_prefix: dict[str, BoardConfig],
) -> pd.DataFrame:
bounds = {}
for board_key, frame in df_placements.groupby("board_key"):
xs = frame["x"].astype(float)
ys = frame["y"].astype(float)
bounds[str(board_key)] = {
"x_min": float(xs.min()),
"x_max": float(xs.max()),
"y_min": float(ys.min()),
"y_max": float(ys.max()),
}
def normalize(value: float, lo: float, hi: float) -> float:
if pd.isna(value) or hi == lo:
return 0.0
return 2 * ((float(value) - lo) / (hi - lo)) - 1
rows: list[dict] = []
for token in vocab_tokens:
meta = {
"token": token,
"token_id": stoi[token],
"kind": "special",
"board_key": None,
"board_token_prefix": None,
"placement_id": np.nan,
"role": None,
"x": np.nan,
"y": np.nan,
"x_norm": 0.0,
"y_norm": 0.0,
"is_hold": 0,
"angle": np.nan,
"grouped_v": np.nan,
}
hold_match = HOLD_TOKEN_PATTERN.match(token)
if hold_match:
prefix = hold_match.group(1)
placement_id = int(hold_match.group(2))
role = hold_match.group(3)
config = configs_by_prefix[prefix]
board_key = config.board_key
row = placement_lookup.get((board_key, placement_id), {})
x = float(row.get("x", np.nan))
y = float(row.get("y", np.nan))
board_bounds = bounds.get(board_key, {"x_min": 0, "x_max": 1, "y_min": 0, "y_max": 1})
meta.update(
{
"kind": "hold",
"board_key": board_key,
"board_token_prefix": prefix,
"placement_id": placement_id,
"role": role,
"x": x,
"y": y,
"x_norm": normalize(x, board_bounds["x_min"], board_bounds["x_max"]),
"y_norm": normalize(y, board_bounds["y_min"], board_bounds["y_max"]),
"is_hold": 1,
}
)
angle_match = ANGLE_TOKEN_PATTERN.match(token)
if angle_match:
meta.update({"kind": "angle", "angle": int(angle_match.group(1))})
grade_match = GRADE_TOKEN_PATTERN.match(token)
if grade_match:
meta.update({"kind": "grade", "grouped_v": int(grade_match.group(1))})
board_match = BOARD_TOKEN_PATTERN.match(token)
if board_match:
prefix = board_match.group(1)
config = configs_by_prefix.get(prefix)
meta.update(
{
"kind": "board",
"board_key": None if config is None else config.board_key,
"board_token_prefix": prefix,
}
)
rows.append(meta)
return pd.DataFrame(rows)
def vocab_payload(
stoi: dict[str, int],
itos: dict[int, str],
configs_by_key: dict[str, BoardConfig],
) -> dict:
return {
"stoi": stoi,
"itos": {str(k): v for k, v in itos.items()},
"special_tokens": SPECIAL_TOKENS,
"boards": {
board_key: {
"token_prefix": config.token_prefix,
"board_token": board_token(config),
"role_definitions": config.role_definitions,
}
for board_key, config in configs_by_key.items()
},
"grade_to_v": {str(k): v for k, v in GRADE_TO_V.items()},
}

View File

@@ -0,0 +1,77 @@
from __future__ import annotations
import json
import random
from pathlib import Path
from typing import Any
import numpy as np
import pandas as pd
from sklearn.model_selection import train_test_split
def set_seed(seed: int) -> None:
random.seed(seed)
np.random.seed(seed)
try:
import torch
torch.manual_seed(seed)
if torch.cuda.is_available():
torch.cuda.manual_seed_all(seed)
except ImportError:
pass
def json_safe(obj: Any) -> Any:
if isinstance(obj, dict):
return {str(k): json_safe(v) for k, v in obj.items()}
if isinstance(obj, (list, tuple)):
return [json_safe(v) for v in obj]
if isinstance(obj, np.integer):
return int(obj)
if isinstance(obj, np.floating):
if np.isnan(obj):
return None
return float(obj)
if isinstance(obj, np.ndarray):
return json_safe(obj.tolist())
try:
if pd.isna(obj):
return None
except Exception:
pass
return obj
def write_json(path: str | Path, payload: Any) -> None:
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(json.dumps(json_safe(payload), indent=2), encoding="utf-8")
def safe_train_test_split(
df: pd.DataFrame,
test_size: float,
random_state: int,
stratify_col: str | None = None,
):
stratify = None
if stratify_col is not None and stratify_col in df.columns:
counts = df[stratify_col].value_counts()
if len(counts) > 1 and counts.min() >= 2:
stratify = df[stratify_col]
try:
return train_test_split(
df,
test_size=test_size,
random_state=random_state,
stratify=stratify,
)
except ValueError:
return train_test_split(
df,
test_size=test_size,
random_state=random_state,
stratify=None,
)