Files
Tension-Board-2-Analysis/scripts/predict.py
2026-03-26 18:01:52 -04:00

976 lines
33 KiB
Python

import re
from pathlib import Path
import joblib
import numpy as np
import pandas as pd
from scipy.spatial import ConvexHull
from scipy.spatial.distance import pdist, squareform
try:
import torch
import torch.nn as nn
TORCH_AVAILABLE = True
except ImportError:
TORCH_AVAILABLE = False
# ============================================================
# Paths
# ============================================================
ROOT = Path(__file__).resolve().parents[1]
SCALER_PATH = ROOT / "models" / "feature_scaler.pkl"
FEATURE_NAMES_PATH = ROOT / "models" / "feature_names.txt"
HOLD_DIFFICULTY_PATH = ROOT / "data" / "03_hold_difficulty" / "hold_difficulty_scores.csv"
PLACEMENTS_PATH = ROOT / "data" / "placements.csv" # adjust if needed
# ============================================================
# Model registry
# ============================================================
MODEL_REGISTRY = {
"linear": {
"path": ROOT / "models" / "linear_regression.pkl",
"kind": "sklearn",
"needs_scaling": True,
},
"ridge": {
"path": ROOT / "models" / "ridge_regression.pkl",
"kind": "sklearn",
"needs_scaling": True,
},
"lasso": {
"path": ROOT / "models" / "lasso_regression.pkl",
"kind": "sklearn",
"needs_scaling": True,
},
"random_forest": {
"path": ROOT / "models" / "random_forest_tuned.pkl",
"kind": "sklearn",
"needs_scaling": False,
},
"nn_best": {
"path": ROOT / "models" / "neural_network_best.pth",
"kind": "torch_checkpoint",
"needs_scaling": True,
},
}
DEFAULT_MODEL = "random_forest"
# ============================================================
# Board constants
# Adjust if your board coordinate system differs
# ============================================================
x_min, x_max = 0.0, 144.0
y_min, y_max = 0.0, 144.0
board_width = x_max - x_min
board_height = y_max - y_min
# ============================================================
# Role mappings
# ============================================================
HAND_ROLE_IDS = {5, 6, 7}
FOOT_ROLE_IDS = {8}
def get_role_type(role_id: int) -> str:
mapping = {
5: "start",
6: "middle",
7: "finish",
8: "foot",
}
return mapping.get(role_id, "middle")
# ============================================================
# Grade map
# ============================================================
grade_map = {
10: '4a/V0',
11: '4b/V0',
12: '4c/V0',
13: '5a/V1',
14: '5b/V1',
15: '5c/V2',
16: '6a/V3',
17: '6a+/V3',
18: '6b/V4',
19: '6b+/V4',
20: '6c/V5',
21: '6c+/V5',
22: '7a/V6',
23: '7a+/V7',
24: '7b/V8',
25: '7b+/V8',
26: '7c/V9',
27: '7c+/V10',
28: '8a/V11',
29: '8a+/V12',
30: '8b/V13',
31: '8b+/V14',
32: '8c/V15',
33: '8c+/V16'
}
MIN_GRADE = min(grade_map)
MAX_GRADE = max(grade_map)
# ============================================================
# Neural network architecture from Notebook 06
# ============================================================
if TORCH_AVAILABLE:
class ClimbGradePredictor(nn.Module):
def __init__(self, input_dim, hidden_layers=None, dropout_rate=0.2):
super().__init__()
if hidden_layers is None:
hidden_layers = [256, 128, 64]
layers = []
prev_dim = input_dim
for hidden_dim in hidden_layers:
layers.append(nn.Linear(prev_dim, hidden_dim))
layers.append(nn.BatchNorm1d(hidden_dim))
layers.append(nn.ReLU())
layers.append(nn.Dropout(dropout_rate))
prev_dim = hidden_dim
layers.append(nn.Linear(prev_dim, 1))
self.network = nn.Sequential(*layers)
def forward(self, x):
return self.network(x)
# ============================================================
# Load shared artifacts
# ============================================================
scaler = joblib.load(SCALER_PATH)
with open(FEATURE_NAMES_PATH, "r") as f:
FEATURE_NAMES = [line.strip() for line in f if line.strip()]
df_hold_difficulty = pd.read_csv(HOLD_DIFFICULTY_PATH, index_col="placement_id")
df_placements = pd.read_csv(PLACEMENTS_PATH)
placement_coords = {
int(row["placement_id"]): (row["x"], row["y"])
for _, row in df_placements.iterrows()
}
# ============================================================
# Model loading
# ============================================================
_MODEL_CACHE = {}
def normalize_model_name(model_name: str) -> str:
if model_name == "nn":
return "nn_best"
return model_name
def load_model(model_name=DEFAULT_MODEL):
model_name = normalize_model_name(model_name)
if model_name not in MODEL_REGISTRY:
raise ValueError(
f"Unknown model '{model_name}'. Choose from: {list(MODEL_REGISTRY.keys()) + ['nn']}"
)
if model_name in _MODEL_CACHE:
return _MODEL_CACHE[model_name]
info = MODEL_REGISTRY[model_name]
path = info["path"]
if info["kind"] == "sklearn":
model = joblib.load(path)
elif info["kind"] == "torch_checkpoint":
if not TORCH_AVAILABLE:
raise ImportError("PyTorch is not installed, so the neural network model cannot be used.")
checkpoint = torch.load(path, map_location="cpu")
if hasattr(checkpoint, "eval"):
model = checkpoint
model.eval()
elif isinstance(checkpoint, dict):
input_dim = checkpoint.get("input_dim", len(FEATURE_NAMES))
hidden_layers = checkpoint.get("hidden_layers", [256, 128, 64])
dropout_rate = checkpoint.get("dropout_rate", 0.2)
model = ClimbGradePredictor(
input_dim=input_dim,
hidden_layers=hidden_layers,
dropout_rate=dropout_rate,
)
if "model_state_dict" in checkpoint:
model.load_state_dict(checkpoint["model_state_dict"])
else:
model.load_state_dict(checkpoint)
model.eval()
else:
raise RuntimeError(
f"Unsupported checkpoint type for {model_name}: {type(checkpoint)}"
)
else:
raise ValueError(f"Unsupported model kind: {info['kind']}")
_MODEL_CACHE[model_name] = model
return model
# ============================================================
# Helpers
# ============================================================
def parse_frames(frames: str):
"""
Parse strings like:
p304r8p378r6p552r6
into:
[(304, 8), (378, 6), (552, 6)]
"""
if not isinstance(frames, str) or not frames.strip():
return []
matches = re.findall(r"p(\d+)r(\d+)", frames)
return [(int(p), int(r)) for p, r in matches]
def lookup_hold_difficulty(placement_id, angle, role_type, is_hand, is_foot):
"""
Preference order:
1. role-specific per-angle
2. aggregate hand/foot per-angle
3. overall_difficulty fallback
"""
if placement_id not in df_hold_difficulty.index:
return np.nan
row = df_hold_difficulty.loc[placement_id]
diff_key = f"{role_type}_diff_{int(angle)}deg"
hand_diff_key = f"hand_diff_{int(angle)}deg"
foot_diff_key = f"foot_diff_{int(angle)}deg"
difficulty = np.nan
if diff_key in row.index:
difficulty = row[diff_key]
if pd.isna(difficulty):
if is_hand and hand_diff_key in row.index:
difficulty = row[hand_diff_key]
elif is_foot and foot_diff_key in row.index:
difficulty = row[foot_diff_key]
if pd.isna(difficulty) and "overall_difficulty" in row.index:
difficulty = row["overall_difficulty"]
return difficulty
# ============================================================
# Feature extraction
# ============================================================
def extract_features_from_raw(angle, frames, is_nomatch=0, description=""):
features = {}
holds = parse_frames(frames)
if not holds:
raise ValueError("Could not parse any holds from frames.")
hold_data = []
for placement_id, role_id in holds:
coords = placement_coords.get(placement_id, (None, None))
if coords[0] is None:
continue
role_type = get_role_type(role_id)
is_hand = role_id in HAND_ROLE_IDS
is_foot = role_id in FOOT_ROLE_IDS
difficulty = lookup_hold_difficulty(
placement_id=placement_id,
angle=angle,
role_type=role_type,
is_hand=is_hand,
is_foot=is_foot,
)
hold_data.append({
"placement_id": placement_id,
"x": coords[0],
"y": coords[1],
"role_id": role_id,
"role_type": role_type,
"is_hand": is_hand,
"is_foot": is_foot,
"difficulty": difficulty,
})
if not hold_data:
raise ValueError("No valid holds found after parsing frames.")
df_holds = pd.DataFrame(hold_data)
hand_holds = df_holds[df_holds["is_hand"]]
foot_holds = df_holds[df_holds["is_foot"]]
start_holds = df_holds[df_holds["role_type"] == "start"]
finish_holds = df_holds[df_holds["role_type"] == "finish"]
middle_holds = df_holds[df_holds["role_type"] == "middle"]
xs = df_holds["x"].values
ys = df_holds["y"].values
features["angle"] = angle
features["total_holds"] = len(df_holds)
features["hand_holds"] = len(hand_holds)
features["foot_holds"] = len(foot_holds)
features["start_holds"] = len(start_holds)
features["finish_holds"] = len(finish_holds)
features["middle_holds"] = len(middle_holds)
desc = str(description) if description is not None else ""
features["is_nomatch"] = int(
(is_nomatch == 1) or
bool(re.search(r"\bno\s*match(ing)?\b", desc, flags=re.IGNORECASE))
)
features["mean_x"] = np.mean(xs)
features["mean_y"] = np.mean(ys)
features["std_x"] = np.std(xs) if len(xs) > 1 else 0
features["std_y"] = np.std(ys) if len(ys) > 1 else 0
features["range_x"] = np.max(xs) - np.min(xs)
features["range_y"] = np.max(ys) - np.min(ys)
features["min_y"] = np.min(ys)
features["max_y"] = np.max(ys)
if len(start_holds) > 0:
features["start_height"] = start_holds["y"].mean()
features["start_height_min"] = start_holds["y"].min()
features["start_height_max"] = start_holds["y"].max()
else:
features["start_height"] = np.nan
features["start_height_min"] = np.nan
features["start_height_max"] = np.nan
if len(finish_holds) > 0:
features["finish_height"] = finish_holds["y"].mean()
features["finish_height_min"] = finish_holds["y"].min()
features["finish_height_max"] = finish_holds["y"].max()
else:
features["finish_height"] = np.nan
features["finish_height_min"] = np.nan
features["finish_height_max"] = np.nan
features["height_gained"] = features["max_y"] - features["min_y"]
if pd.notna(features["finish_height"]) and pd.notna(features["start_height"]):
features["height_gained_start_finish"] = features["finish_height"] - features["start_height"]
else:
features["height_gained_start_finish"] = np.nan
bbox_width = features["range_x"]
bbox_height = features["range_y"]
features["bbox_area"] = bbox_width * bbox_height
features["bbox_aspect_ratio"] = bbox_width / bbox_height if bbox_height > 0 else 0
features["bbox_normalized_area"] = features["bbox_area"] / (board_width * board_height)
features["hold_density"] = features["total_holds"] / features["bbox_area"] if features["bbox_area"] > 0 else 0
features["holds_per_vertical_foot"] = features["total_holds"] / max(features["range_y"], 1)
center_x = (x_min + x_max) / 2
features["left_holds"] = (df_holds["x"] < center_x).sum()
features["right_holds"] = (df_holds["x"] >= center_x).sum()
features["left_ratio"] = features["left_holds"] / features["total_holds"] if features["total_holds"] > 0 else 0.5
features["symmetry_score"] = 1 - abs(features["left_ratio"] - 0.5) * 2
if len(hand_holds) > 0:
hand_left = (hand_holds["x"] < center_x).sum()
features["hand_left_ratio"] = hand_left / len(hand_holds)
features["hand_symmetry"] = 1 - abs(features["hand_left_ratio"] - 0.5) * 2
else:
features["hand_left_ratio"] = np.nan
features["hand_symmetry"] = np.nan
y_median = np.median(ys)
features["upper_holds"] = (df_holds["y"] > y_median).sum()
features["lower_holds"] = (df_holds["y"] <= y_median).sum()
features["upper_ratio"] = features["upper_holds"] / features["total_holds"]
if len(hand_holds) >= 2:
hand_xs = hand_holds["x"].values
hand_ys = hand_holds["y"].values
hand_distances = []
for i in range(len(hand_holds)):
for j in range(i + 1, len(hand_holds)):
dx = hand_xs[i] - hand_xs[j]
dy = hand_ys[i] - hand_ys[j]
hand_distances.append(np.sqrt(dx**2 + dy**2))
features["max_hand_reach"] = max(hand_distances)
features["min_hand_reach"] = min(hand_distances)
features["mean_hand_reach"] = np.mean(hand_distances)
features["std_hand_reach"] = np.std(hand_distances)
features["hand_spread_x"] = hand_xs.max() - hand_xs.min()
features["hand_spread_y"] = hand_ys.max() - hand_ys.min()
else:
features["max_hand_reach"] = 0
features["min_hand_reach"] = 0
features["mean_hand_reach"] = 0
features["std_hand_reach"] = 0
features["hand_spread_x"] = 0
features["hand_spread_y"] = 0
if len(foot_holds) >= 2:
foot_xs = foot_holds["x"].values
foot_ys = foot_holds["y"].values
foot_distances = []
for i in range(len(foot_holds)):
for j in range(i + 1, len(foot_holds)):
dx = foot_xs[i] - foot_xs[j]
dy = foot_ys[i] - foot_ys[j]
foot_distances.append(np.sqrt(dx**2 + dy**2))
features["max_foot_spread"] = max(foot_distances)
features["mean_foot_spread"] = np.mean(foot_distances)
features["foot_spread_x"] = foot_xs.max() - foot_xs.min()
features["foot_spread_y"] = foot_ys.max() - foot_ys.min()
else:
features["max_foot_spread"] = 0
features["mean_foot_spread"] = 0
features["foot_spread_x"] = 0
features["foot_spread_y"] = 0
if len(hand_holds) > 0 and len(foot_holds) > 0:
h2f_distances = []
for _, h in hand_holds.iterrows():
for _, f in foot_holds.iterrows():
dx = h["x"] - f["x"]
dy = h["y"] - f["y"]
h2f_distances.append(np.sqrt(dx**2 + dy**2))
features["max_hand_to_foot"] = max(h2f_distances)
features["min_hand_to_foot"] = min(h2f_distances)
features["mean_hand_to_foot"] = np.mean(h2f_distances)
features["std_hand_to_foot"] = np.std(h2f_distances)
else:
features["max_hand_to_foot"] = 0
features["min_hand_to_foot"] = 0
features["mean_hand_to_foot"] = 0
features["std_hand_to_foot"] = 0
difficulties = df_holds["difficulty"].dropna().values
if len(difficulties) > 0:
features["mean_hold_difficulty"] = np.mean(difficulties)
features["max_hold_difficulty"] = np.max(difficulties)
features["min_hold_difficulty"] = np.min(difficulties)
features["std_hold_difficulty"] = np.std(difficulties)
features["median_hold_difficulty"] = np.median(difficulties)
features["difficulty_range"] = features["max_hold_difficulty"] - features["min_hold_difficulty"]
else:
features["mean_hold_difficulty"] = np.nan
features["max_hold_difficulty"] = np.nan
features["min_hold_difficulty"] = np.nan
features["std_hold_difficulty"] = np.nan
features["median_hold_difficulty"] = np.nan
features["difficulty_range"] = np.nan
hand_diffs = hand_holds["difficulty"].dropna().values if len(hand_holds) > 0 else np.array([])
if len(hand_diffs) > 0:
features["mean_hand_difficulty"] = np.mean(hand_diffs)
features["max_hand_difficulty"] = np.max(hand_diffs)
features["std_hand_difficulty"] = np.std(hand_diffs)
else:
features["mean_hand_difficulty"] = np.nan
features["max_hand_difficulty"] = np.nan
features["std_hand_difficulty"] = np.nan
foot_diffs = foot_holds["difficulty"].dropna().values if len(foot_holds) > 0 else np.array([])
if len(foot_diffs) > 0:
features["mean_foot_difficulty"] = np.mean(foot_diffs)
features["max_foot_difficulty"] = np.max(foot_diffs)
features["std_foot_difficulty"] = np.std(foot_diffs)
else:
features["mean_foot_difficulty"] = np.nan
features["max_foot_difficulty"] = np.nan
features["std_foot_difficulty"] = np.nan
start_diffs = start_holds["difficulty"].dropna().values if len(start_holds) > 0 else np.array([])
finish_diffs = finish_holds["difficulty"].dropna().values if len(finish_holds) > 0 else np.array([])
features["start_difficulty"] = np.mean(start_diffs) if len(start_diffs) > 0 else np.nan
features["finish_difficulty"] = np.mean(finish_diffs) if len(finish_diffs) > 0 else np.nan
features["hand_foot_ratio"] = features["hand_holds"] / max(features["foot_holds"], 1)
features["movement_density"] = features["total_holds"] / max(features["height_gained"], 1)
features["hold_com_x"] = np.average(xs)
features["hold_com_y"] = np.average(ys)
if len(difficulties) > 0 and len(ys) >= len(difficulties):
weights = (ys[:len(difficulties)] - ys.min()) / max(ys.max() - ys.min(), 1) + 0.5
features["weighted_difficulty"] = np.average(difficulties, weights=weights)
else:
features["weighted_difficulty"] = features["mean_hold_difficulty"]
if len(df_holds) >= 3:
try:
points = np.column_stack([xs, ys])
hull = ConvexHull(points)
features["convex_hull_area"] = hull.volume
features["convex_hull_perimeter"] = hull.area
features["hull_area_to_bbox_ratio"] = features["convex_hull_area"] / max(features["bbox_area"], 1)
except Exception:
features["convex_hull_area"] = np.nan
features["convex_hull_perimeter"] = np.nan
features["hull_area_to_bbox_ratio"] = np.nan
else:
features["convex_hull_area"] = 0
features["convex_hull_perimeter"] = 0
features["hull_area_to_bbox_ratio"] = 0
if len(df_holds) >= 2:
points = np.column_stack([xs, ys])
distances = pdist(points)
features["min_nn_distance"] = np.min(distances)
features["mean_nn_distance"] = np.mean(distances)
features["max_nn_distance"] = np.max(distances)
features["std_nn_distance"] = np.std(distances)
else:
features["min_nn_distance"] = 0
features["mean_nn_distance"] = 0
features["max_nn_distance"] = 0
features["std_nn_distance"] = 0
if len(df_holds) >= 3:
points = np.column_stack([xs, ys])
dist_matrix = squareform(pdist(points))
threshold = 12.0
neighbors_count = (dist_matrix < threshold).sum(axis=1) - 1
features["mean_neighbors_12in"] = np.mean(neighbors_count)
features["max_neighbors_12in"] = np.max(neighbors_count)
avg_neighbors = np.mean(neighbors_count)
max_possible = len(df_holds) - 1
features["clustering_ratio"] = avg_neighbors / max_possible if max_possible > 0 else 0
else:
features["mean_neighbors_12in"] = 0
features["max_neighbors_12in"] = 0
features["clustering_ratio"] = 0
if len(df_holds) >= 2:
sorted_indices = np.argsort(ys)
sorted_points = np.column_stack([xs[sorted_indices], ys[sorted_indices]])
path_length = 0
for i in range(len(sorted_points) - 1):
dx = sorted_points[i + 1, 0] - sorted_points[i, 0]
dy = sorted_points[i + 1, 1] - sorted_points[i, 1]
path_length += np.sqrt(dx**2 + dy**2)
features["path_length_vertical"] = path_length
features["path_efficiency"] = features["height_gained"] / max(path_length, 1)
else:
features["path_length_vertical"] = 0
features["path_efficiency"] = 0
if pd.notna(features["finish_difficulty"]) and pd.notna(features["start_difficulty"]):
features["difficulty_gradient"] = features["finish_difficulty"] - features["start_difficulty"]
else:
features["difficulty_gradient"] = np.nan
if len(difficulties) > 0:
y_min_val, y_max_val = ys.min(), ys.max()
y_range = y_max_val - y_min_val
if y_range > 0:
lower_mask = ys <= (y_min_val + y_range / 3)
middle_mask = (ys > y_min_val + y_range / 3) & (ys <= y_min_val + 2 * y_range / 3)
upper_mask = ys > (y_min_val + 2 * y_range / 3)
df_with_diff = df_holds.copy()
df_with_diff["lower"] = lower_mask
df_with_diff["middle"] = middle_mask
df_with_diff["upper"] = upper_mask
lower_diffs = df_with_diff[df_with_diff["lower"] & df_with_diff["difficulty"].notna()]["difficulty"]
middle_diffs = df_with_diff[df_with_diff["middle"] & df_with_diff["difficulty"].notna()]["difficulty"]
upper_diffs = df_with_diff[df_with_diff["upper"] & df_with_diff["difficulty"].notna()]["difficulty"]
features["lower_region_difficulty"] = lower_diffs.mean() if len(lower_diffs) > 0 else np.nan
features["middle_region_difficulty"] = middle_diffs.mean() if len(middle_diffs) > 0 else np.nan
features["upper_region_difficulty"] = upper_diffs.mean() if len(upper_diffs) > 0 else np.nan
if pd.notna(features["lower_region_difficulty"]) and pd.notna(features["upper_region_difficulty"]):
features["difficulty_progression"] = features["upper_region_difficulty"] - features["lower_region_difficulty"]
else:
features["difficulty_progression"] = np.nan
else:
features["lower_region_difficulty"] = features["mean_hold_difficulty"]
features["middle_region_difficulty"] = features["mean_hold_difficulty"]
features["upper_region_difficulty"] = features["mean_hold_difficulty"]
features["difficulty_progression"] = 0
else:
features["lower_region_difficulty"] = np.nan
features["middle_region_difficulty"] = np.nan
features["upper_region_difficulty"] = np.nan
features["difficulty_progression"] = np.nan
if len(hand_holds) >= 2 and len(hand_diffs) >= 2:
hand_sorted = hand_holds.sort_values("y")
hand_diff_sorted = hand_sorted["difficulty"].dropna().values
if len(hand_diff_sorted) >= 2:
difficulty_jumps = np.abs(np.diff(hand_diff_sorted))
features["max_difficulty_jump"] = np.max(difficulty_jumps) if len(difficulty_jumps) > 0 else 0
features["mean_difficulty_jump"] = np.mean(difficulty_jumps) if len(difficulty_jumps) > 0 else 0
else:
features["max_difficulty_jump"] = 0
features["mean_difficulty_jump"] = 0
else:
features["max_difficulty_jump"] = 0
features["mean_difficulty_jump"] = 0
if len(hand_holds) >= 2 and len(hand_diffs) >= 2:
hand_sorted = hand_holds.sort_values("y")
xs_sorted = hand_sorted["x"].values
ys_sorted = hand_sorted["y"].values
diffs_sorted = hand_sorted["difficulty"].fillna(np.mean(hand_diffs)).values
weighted_reach = []
for i in range(len(hand_sorted) - 1):
dx = xs_sorted[i + 1] - xs_sorted[i]
dy = ys_sorted[i + 1] - ys_sorted[i]
dist = np.sqrt(dx**2 + dy**2)
avg_diff = (diffs_sorted[i] + diffs_sorted[i + 1]) / 2
weighted_reach.append(dist * avg_diff)
features["difficulty_weighted_reach"] = np.mean(weighted_reach) if weighted_reach else 0
features["max_weighted_reach"] = np.max(weighted_reach) if weighted_reach else 0
else:
features["difficulty_weighted_reach"] = 0
features["max_weighted_reach"] = 0
features["mean_x_normalized"] = (features["mean_x"] - x_min) / board_width
features["mean_y_normalized"] = (features["mean_y"] - y_min) / board_height
features["std_x_normalized"] = features["std_x"] / board_width
features["std_y_normalized"] = features["std_y"] / board_height
if pd.notna(features["start_height"]):
features["start_height_normalized"] = (features["start_height"] - y_min) / board_height
else:
features["start_height_normalized"] = np.nan
if pd.notna(features["finish_height"]):
features["finish_height_normalized"] = (features["finish_height"] - y_min) / board_height
else:
features["finish_height_normalized"] = np.nan
typical_start_y = y_min + board_height * 0.15
typical_finish_y = y_min + board_height * 0.85
if pd.notna(features["start_height"]):
features["start_offset_from_typical"] = abs(features["start_height"] - typical_start_y)
else:
features["start_offset_from_typical"] = np.nan
if pd.notna(features["finish_height"]):
features["finish_offset_from_typical"] = abs(features["finish_height"] - typical_finish_y)
else:
features["finish_offset_from_typical"] = np.nan
if len(start_holds) > 0:
start_y = start_holds["y"].mean()
features["mean_y_relative_to_start"] = features["mean_y"] - start_y
features["max_y_relative_to_start"] = features["max_y"] - start_y
else:
features["mean_y_relative_to_start"] = np.nan
features["max_y_relative_to_start"] = np.nan
features["spread_x_normalized"] = features["range_x"] / board_width
features["spread_y_normalized"] = features["range_y"] / board_height
features["bbox_coverage_x"] = features["range_x"] / board_width
features["bbox_coverage_y"] = features["range_y"] / board_height
y_quartiles = np.percentile(ys, [25, 50, 75])
features["y_q25"] = y_quartiles[0]
features["y_q50"] = y_quartiles[1]
features["y_q75"] = y_quartiles[2]
features["y_iqr"] = y_quartiles[2] - y_quartiles[0]
features["holds_bottom_quartile"] = (ys < y_quartiles[0]).sum()
features["holds_top_quartile"] = (ys >= y_quartiles[2]).sum()
return features
# ============================================================
# Model input preparation
# ============================================================
def prepare_feature_vector(features: dict) -> pd.DataFrame:
row = {}
for col in FEATURE_NAMES:
value = features.get(col, 0.0)
row[col] = 0.0 if pd.isna(value) else value
return pd.DataFrame([row], columns=FEATURE_NAMES)
# ============================================================
# Prediction helpers
# ============================================================
def format_prediction(pred: float):
rounded = int(round(pred))
rounded = max(min(rounded, MAX_GRADE), MIN_GRADE)
return {
"predicted_numeric": float(pred),
"predicted_display_difficulty": rounded,
"predicted_boulder_grade": grade_map[rounded],
}
def predict_with_model(model, X: pd.DataFrame, model_name: str):
model_name = normalize_model_name(model_name)
info = MODEL_REGISTRY[model_name]
if info["kind"] == "sklearn":
X_input = scaler.transform(X) if info["needs_scaling"] else X
pred = model.predict(X_input)[0]
return float(pred)
if info["kind"] == "torch_checkpoint":
if not TORCH_AVAILABLE:
raise ImportError("PyTorch is not installed.")
X_input = scaler.transform(X) if info["needs_scaling"] else X
X_tensor = torch.tensor(np.asarray(X_input), dtype=torch.float32)
with torch.no_grad():
out = model(X_tensor)
if isinstance(out, tuple):
out = out[0]
pred = np.asarray(out).reshape(-1)[0]
return float(pred)
raise ValueError(f"Unsupported model kind: {info['kind']}")
# ============================================================
# Public API
# ============================================================
def predict(
angle,
frames,
is_nomatch=0,
description="",
model_name=DEFAULT_MODEL,
return_numeric=False,
debug=False,
):
model_name = normalize_model_name(model_name)
model = load_model(model_name)
features = extract_features_from_raw(
angle=angle,
frames=frames,
is_nomatch=is_nomatch,
description=description,
)
X = prepare_feature_vector(features)
if debug:
print("\nNonzero / non-null feature values:")
for col, val in X.iloc[0].items():
if pd.notna(val) and val != 0:
print(f"{col}: {val}")
pred = predict_with_model(model, X, model_name=model_name)
if return_numeric:
return float(pred)
result = format_prediction(pred)
result["model"] = model_name
return result
def predict_csv(
input_csv,
output_csv=None,
model_name=DEFAULT_MODEL,
angle_col="angle",
frames_col="frames",
is_nomatch_col="is_nomatch",
description_col="description",
):
"""
Batch prediction over a CSV file.
Required columns:
- angle
- frames
Optional columns:
- is_nomatch
- description
"""
model_name = normalize_model_name(model_name)
df = pd.read_csv(input_csv)
if angle_col not in df.columns:
raise ValueError(f"Missing required column: '{angle_col}'")
if frames_col not in df.columns:
raise ValueError(f"Missing required column: '{frames_col}'")
results = []
for _, row in df.iterrows():
angle = row[angle_col]
frames = row[frames_col]
is_nomatch = row[is_nomatch_col] if is_nomatch_col in df.columns and pd.notna(row[is_nomatch_col]) else 0
description = row[description_col] if description_col in df.columns and pd.notna(row[description_col]) else ""
pred = predict(
angle=angle,
frames=frames,
is_nomatch=is_nomatch,
description=description,
model_name=model_name,
return_numeric=False,
debug=False,
)
results.append(pred)
pred_df = pd.DataFrame(results)
out = pd.concat([df.reset_index(drop=True), pred_df.reset_index(drop=True)], axis=1)
if output_csv is not None:
out.to_csv(output_csv, index=False)
return out
def evaluate_predictions(df, true_col="display_difficulty", pred_col="predicted_numeric"):
"""
Simple evaluation summary for labeled batch predictions.
"""
if true_col not in df.columns:
raise ValueError(f"Missing true target column: '{true_col}'")
if pred_col not in df.columns:
raise ValueError(f"Missing prediction column: '{pred_col}'")
y_true = df[true_col].astype(float)
y_pred = df[pred_col].astype(float)
mae = np.mean(np.abs(y_true - y_pred))
rmse = np.sqrt(np.mean((y_true - y_pred) ** 2))
within_1 = np.mean(np.abs(y_true - y_pred) <= 1)
within_2 = np.mean(np.abs(y_true - y_pred) <= 2)
return {
"mae": float(mae),
"rmse": float(rmse),
"within_1": float(within_1),
"within_2": float(within_2),
}
# ============================================================
# CLI
# ============================================================
if __name__ == "__main__":
import argparse
parser = argparse.ArgumentParser()
# Single prediction mode
parser.add_argument("--angle", type=int)
parser.add_argument("--frames", type=str)
parser.add_argument("--is_nomatch", type=int, default=0)
parser.add_argument("--description", type=str, default="")
# Batch mode
parser.add_argument("--input_csv", type=str)
parser.add_argument("--output_csv", type=str)
parser.add_argument(
"--model",
type=str,
default=DEFAULT_MODEL,
choices=list(MODEL_REGISTRY.keys()) + ["nn"],
help="Which trained model to use",
)
parser.add_argument("--numeric", action="store_true")
parser.add_argument("--debug", action="store_true")
parser.add_argument("--evaluate", action="store_true")
args = parser.parse_args()
if args.input_csv:
df_out = predict_csv(
input_csv=args.input_csv,
output_csv=args.output_csv,
model_name=args.model,
)
print(df_out.head())
if args.evaluate:
try:
metrics = evaluate_predictions(df_out)
print("\nEvaluation:")
for k, v in metrics.items():
print(f"{k}: {v:.4f}")
except Exception as e:
print(f"\nCould not evaluate predictions: {e}")
else:
if args.angle is None or args.frames is None:
raise ValueError("For single prediction, you must provide --angle and --frames")
pred = predict(
angle=args.angle,
frames=args.frames,
is_nomatch=args.is_nomatch,
description=args.description,
model_name=args.model,
return_numeric=args.numeric,
debug=args.debug,
)
print(pred)