feat(ml): add ensemble XGBoost+LightGBM+MLP with Optuna optimization

- train_ensemble.py: full training pipeline with 100-trial Optuna studies
  for XGBoost and LightGBM, MLP (256-128-64), SHAP feature selection,
  weighted soft-voting ensemble, benchmark report generation
- predict_v2.py: production prediction module with model cache invalidation
- combined_api.py: add /api/v1/predictions, /api/v1/model/status,
  /api/v1/model/invalidate-cache endpoints using ensemble model
- tests/test_ml_ensemble.py: regression, latency and API tests

Baseline XGBoost Precision@3: 0.5287 (holdout 20% temporal)
Deploy threshold: +5% = 0.5551

Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
DevOps Engineer
2026-04-25 18:18:41 +02:00
parent ed07c8a3d1
commit 0e7bcff6b0
4 changed files with 1960 additions and 8 deletions

387
predict_v2.py Normal file
View File

@@ -0,0 +1,387 @@
#!/usr/bin/env python3
"""
Ensemble prediction module for /api/v1/predictions.
Loads the trained ensemble model and provides a high-level predict_top3()
function compatible with the existing combined_api.py interface.
Cache: model is loaded once at import time (or on first call).
Invalidation: reload if models/ensemble_top3.pkl mtime changes.
"""
import logging
import os
import pickle
import re
import threading
import time
from datetime import datetime
from pathlib import Path
from typing import Optional
import numpy as np
import pandas as pd
from sklearn.preprocessing import LabelEncoder
logger = logging.getLogger(__name__)
MODELS_DIR = Path("/home/h3r7/turf_saas/models")
ENSEMBLE_PATH = MODELS_DIR / "ensemble_top3.pkl"
# ── Cache ─────────────────────────────────────────────────────────────────────
_model_cache = {
"ensemble": None,
"mtime": None,
"lock": threading.Lock(),
}
# ── Feature list (must match train_ensemble.py FEATURE_COLS) ─────────────────
FEATURE_COLS = [
"age",
"sexe_enc",
"nombre_courses",
"nombre_victoires",
"nombre_places",
"tx_victoire",
"tx_place",
"forme_recente",
"tendance_num",
"gains_annee_en_cours",
"cote_direct",
"cote_reference",
"distance",
"nb_partants",
"discipline_enc",
"specialite_enc",
"oeilleres_enc",
"tendance_cote_enc",
"penetrometre_intitule_enc",
"form_1",
"form_2",
"form_3",
"form_4",
"form_5",
"form_weighted",
"form_avg",
"form_best",
"form_worst",
"win_ratio",
"place_ratio",
"implied_prob",
"win_rate_adj",
"place_rate_adj",
"earnings_per_race",
"cote_diff",
"cote_ratio",
"rang_cote",
"ratio_cote_field",
"distance_cat",
"age_win_interact",
"is_favorite",
"poids",
"prize_norm",
]
# ── Encoders (built per-prediction batch for live data) ──────────────────────
def _fit_encoder(values, default):
le = LabelEncoder()
unique = list(set(str(v) if v else default for v in values)) + [default]
le.fit(unique)
return le
def _safe_transform(le: LabelEncoder, value, default: str):
v = str(value) if value else default
if v not in le.classes_:
v = default
return int(le.transform([v])[0])
# ── Model loading with auto-invalidation ─────────────────────────────────────
def load_ensemble(force: bool = False) -> Optional[object]:
"""Load ensemble model, reload if file changed."""
with _model_cache["lock"]:
if not ENSEMBLE_PATH.exists():
return None
mtime = ENSEMBLE_PATH.stat().st_mtime
if force or _model_cache["ensemble"] is None or mtime != _model_cache["mtime"]:
try:
with open(ENSEMBLE_PATH, "rb") as f:
_model_cache["ensemble"] = pickle.load(f)
_model_cache["mtime"] = mtime
logger.info(f"[predict_v2] Loaded ensemble model from {ENSEMBLE_PATH}")
except Exception as e:
logger.error(f"[predict_v2] Failed to load ensemble: {e}")
return None
return _model_cache["ensemble"]
def invalidate_model_cache():
"""Force reload on next prediction call."""
with _model_cache["lock"]:
_model_cache["mtime"] = None
# ── Feature engineering for live pmu_partants rows ───────────────────────────
def _parse_musique(musique) -> list:
if not musique or pd.isna(str(musique)):
return [0, 0, 0, 0, 0]
try:
clean = re.sub(r"\(\d+\)", "", str(musique))
numbers = re.findall(r"\d+", clean)
result = [int(n) for n in numbers[:5]]
result += [0] * (5 - len(result))
return result[:5]
except Exception:
return [0, 0, 0, 0, 0]
def build_feature_df(partants: list) -> pd.DataFrame:
"""
Convert a list of pmu_partants dicts to a feature DataFrame.
Expected keys (same as pmu_partants columns):
date_programme, num_reunion, num_course, num_pmu,
age, sexe, musique, nombre_courses, nombre_victoires, nombre_places,
gains_annee_en_cours, handicap_poids, oeilleres, cote_direct,
cote_reference, tendance_cote, favoris, tx_victoire, tx_place,
forme_recente, tendance_forme, indicateur_inedit,
distance, discipline, specialite, nb_declares_partants,
montant_prix, penetrometre_intitule
"""
if not partants:
return pd.DataFrame()
df = pd.DataFrame(partants)
# ── Categorical encoders fitted on this batch ─────────────────────────────
le_sexe = _fit_encoder(df.get("sexe", ["U"]), "U")
le_oeilleres = _fit_encoder(df.get("oeilleres", ["SANS"]), "SANS")
le_discipline = _fit_encoder(df.get("discipline", ["UNKNOWN"]), "UNKNOWN")
le_specialite = _fit_encoder(df.get("specialite", ["UNKNOWN"]), "UNKNOWN")
le_tendance = _fit_encoder(df.get("tendance_cote", ["STABLE"]), "STABLE")
le_penet = _fit_encoder(df.get("penetrometre_intitule", ["BON"]), "BON")
df["sexe_enc"] = df["sexe"].apply(lambda v: _safe_transform(le_sexe, v, "U"))
df["oeilleres_enc"] = df["oeilleres"].apply(
lambda v: _safe_transform(le_oeilleres, v, "SANS")
)
df["discipline_enc"] = df.get("discipline", pd.Series(["UNKNOWN"] * len(df))).apply(
lambda v: _safe_transform(le_discipline, v, "UNKNOWN")
)
df["specialite_enc"] = df.get("specialite", pd.Series(["UNKNOWN"] * len(df))).apply(
lambda v: _safe_transform(le_specialite, v, "UNKNOWN")
)
df["tendance_cote_enc"] = df.get(
"tendance_cote", pd.Series(["STABLE"] * len(df))
).apply(lambda v: _safe_transform(le_tendance, v, "STABLE"))
df["penetrometre_intitule_enc"] = df.get(
"penetrometre_intitule", pd.Series(["BON"] * len(df))
).apply(lambda v: _safe_transform(le_penet, v, "BON"))
# ── Musique ────────────────────────────────────────────────────────────────
music_parsed = df["musique"].apply(_parse_musique)
for i in range(5):
df[f"form_{i + 1}"] = music_parsed.apply(lambda x: x[i])
weights = np.array([0.4, 0.25, 0.15, 0.12, 0.08])
df["form_weighted"] = music_parsed.apply(
lambda x: sum(w * v for w, v in zip(weights, x))
)
df["form_avg"] = music_parsed.apply(np.mean)
df["form_best"] = music_parsed.apply(min)
df["form_worst"] = music_parsed.apply(max)
# ── Numeric features ───────────────────────────────────────────────────────
for col in [
"nombre_courses",
"nombre_victoires",
"nombre_places",
"tx_victoire",
"tx_place",
"forme_recente",
"tendance_forme",
"gains_annee_en_cours",
"cote_direct",
"cote_reference",
"distance",
"handicap_poids",
"age",
"montant_prix",
"nb_declares_partants",
]:
if col not in df.columns:
df[col] = 0.0
df[col] = pd.to_numeric(df[col], errors="coerce").fillna(0)
df["tendance_num"] = df["tendance_forme"].fillna(0)
df["win_ratio"] = df["nombre_victoires"] / df["nombre_courses"].replace(0, 1)
df["place_ratio"] = df["nombre_places"] / df["nombre_courses"].replace(0, 1)
df["implied_prob"] = 1.0 / df["cote_direct"].replace(0, np.nan)
df["win_rate_adj"] = df["tx_victoire"] * np.log1p(df["nombre_courses"])
df["place_rate_adj"] = df["tx_place"] * np.log1p(df["nombre_courses"])
df["earnings_per_race"] = df["gains_annee_en_cours"] / df["nombre_courses"].replace(
0, 1
)
df["cote_diff"] = (df["cote_direct"] - df["cote_reference"]).fillna(0)
df["cote_ratio"] = (
df["cote_direct"] / df["cote_reference"].replace(0, np.nan)
).fillna(1)
# ── Per-race rank features ─────────────────────────────────────────────────
if "num_reunion" in df.columns and "num_course" in df.columns:
grp = ["date_programme", "num_reunion", "num_course"]
# Some fields may be missing
for g in grp:
if g not in df.columns:
df[g] = 0
df["rang_cote"] = df.groupby(grp)["cote_direct"].rank(
method="min", na_option="bottom"
)
race_mean = df.groupby(grp)["cote_direct"].transform("mean")
df["ratio_cote_field"] = df["cote_direct"] / race_mean.replace(0, np.nan)
df["nb_partants"] = df.groupby(grp)["cote_direct"].transform("count")
else:
df["rang_cote"] = 1.0
df["ratio_cote_field"] = 1.0
df["nb_partants"] = df.get("nb_declares_partants", pd.Series([10] * len(df)))
df["distance_cat"] = pd.cut(
df["distance"].fillna(1600),
bins=[0, 1400, 1800, 2200, 2600, 10000],
labels=[1, 2, 3, 4, 5],
).astype(float)
df["age_win_interact"] = df["age"] * df["tx_victoire"]
df["is_favorite"] = (
df.get("favoris", pd.Series([0] * len(df))).fillna(0).astype(int)
)
df["poids"] = df["handicap_poids"].fillna(60)
df["prize_norm"] = np.log1p(df["montant_prix"].fillna(0))
return df
# ── Main prediction function ───────────────────────────────────────────────────
def predict_top3(partants: list, model=None) -> list:
"""
Given a list of partant dicts (from pmu_partants), return predictions.
Returns list of {horse_name, num_pmu, prob_top3, prob_top1_approx, ...}
sorted by prob_top3 descending.
Falls back to empty list if model not available.
"""
t_start = time.perf_counter()
if model is None:
model = load_ensemble()
if model is None:
logger.warning("[predict_v2] Ensemble model not available — no predictions")
return []
df = build_feature_df(partants)
if df.empty:
return []
available = [c for c in FEATURE_COLS if c in df.columns]
X = df[available].fillna(0)
try:
proba = model.predict_proba(X)[:, 1]
except Exception as e:
logger.error(f"[predict_v2] predict_proba failed: {e}")
return []
latency_ms = (time.perf_counter() - t_start) * 1000
results = []
for i, (p, row) in enumerate(zip(proba, partants)):
results.append(
{
"horse_name": row.get("nom", row.get("horse_name", f"H{i}")),
"num_pmu": row.get("num_pmu", i + 1),
"num_reunion": row.get("num_reunion"),
"num_course": row.get("num_course"),
"prob_top3": round(float(p) * 100, 1),
# approx top1 from top3 score (divide by ~2.5 empirically)
"prob_top1": round(float(p) / 2.5 * 100, 1),
"ml_score": round(float(p) * 100, 1),
"recommendation": "top3"
if p >= 0.40
else ("watch" if p >= 0.28 else "pass"),
"is_value_bet": int(
p >= 0.35 and float(row.get("cote_direct", 0) or 0) > 10
),
"model_version": getattr(model, "version", "ensemble_v1"),
}
)
results.sort(key=lambda x: x["prob_top3"], reverse=True)
# Mark top-3 predicted
for i, r in enumerate(results[:3]):
r["predicted_rank"] = i + 1
if results:
logger.info(
f"[predict_v2] {len(results)} horses predicted in {latency_ms:.1f} ms "
f"({latency_ms / len(results):.2f} ms/horse)"
)
return results
# ── API-compatible wrapper keeping model_version & structure ──────────────────
def get_model_version() -> str:
m = load_ensemble()
if m is None:
return "ensemble_v1_not_loaded"
return getattr(m, "version", "ensemble_v1")
if __name__ == "__main__":
# Quick self-test
import sqlite3
conn = sqlite3.connect("/home/h3r7/turf_saas/turf.db")
rows = conn.execute(
"""SELECT p.*, c.distance, c.discipline, c.specialite,
c.nb_declares_partants, c.montant_prix, c.penetrometre_intitule
FROM pmu_partants p
LEFT JOIN pmu_courses c ON p.date_programme=c.date_programme
AND p.num_reunion=c.num_reunion AND p.num_course=c.num_course
WHERE p.date_programme=(SELECT MAX(date_programme) FROM pmu_partants)
AND p.num_reunion=1 AND p.num_course=1
LIMIT 20"""
).fetchall()
conn.close()
if not rows:
print("No data found for self-test")
else:
cols = [d[0] for d in conn.description] if hasattr(conn, "description") else []
# Fallback column list
import sqlite3 as sq3
conn2 = sq3.connect("/home/h3r7/turf_saas/turf.db")
cur = conn2.execute(
"""SELECT p.*, c.distance, c.discipline, c.specialite,
c.nb_declares_partants, c.montant_prix, c.penetrometre_intitule
FROM pmu_partants p
LEFT JOIN pmu_courses c ON p.date_programme=c.date_programme
AND p.num_reunion=c.num_reunion AND p.num_course=c.num_course
WHERE p.date_programme=(SELECT MAX(date_programme) FROM pmu_partants)
AND p.num_reunion=1 AND p.num_course=1
LIMIT 20"""
)
cols = [d[0] for d in cur.description]
rows2 = cur.fetchall()
conn2.close()
partants = [dict(zip(cols, row)) for row in rows2]
preds = predict_top3(partants)
print(f"Self-test: {len(preds)} predictions")
for p in preds[:5]:
print(
f" {p['horse_name']:20s} prob_top3={p['prob_top3']}% rec={p['recommendation']}"
)