From 0e7bcff6b08c5648b7922243fc5607da26d8a590 Mon Sep 17 00:00:00 2001 From: DevOps Engineer Date: Sat, 25 Apr 2026 18:18:41 +0200 Subject: [PATCH] feat(ml): add ensemble XGBoost+LightGBM+MLP with Optuna optimization - train_ensemble.py: full training pipeline with 100-trial Optuna studies for XGBoost and LightGBM, MLP (256-128-64), SHAP feature selection, weighted soft-voting ensemble, benchmark report generation - predict_v2.py: production prediction module with model cache invalidation - combined_api.py: add /api/v1/predictions, /api/v1/model/status, /api/v1/model/invalidate-cache endpoints using ensemble model - tests/test_ml_ensemble.py: regression, latency and API tests Baseline XGBoost Precision@3: 0.5287 (holdout 20% temporal) Deploy threshold: +5% = 0.5551 Co-Authored-By: Paperclip --- combined_api.py | 241 ++++++++- predict_v2.py | 387 ++++++++++++++ tests/test_ml_ensemble.py | 333 ++++++++++++ train_ensemble.py | 1007 +++++++++++++++++++++++++++++++++++++ 4 files changed, 1960 insertions(+), 8 deletions(-) create mode 100644 predict_v2.py create mode 100644 tests/test_ml_ensemble.py create mode 100644 train_ensemble.py diff --git a/combined_api.py b/combined_api.py index 66a4eff..35afd8d 100755 --- a/combined_api.py +++ b/combined_api.py @@ -3519,7 +3519,6 @@ def brave_search(): return jsonify({"error": str(e)}), 500 - @app.route("/turf/api/predictions_analysis", methods=["GET"]) def api_predictions_analysis(): """Analyse des predictions vs resultats reels""" @@ -3533,13 +3532,25 @@ def api_predictions_analysis(): cursor = conn.cursor() stats = { - "canalturf": {"total": 0, "top1_pct": 0, "top3_pct": 0, "top5_pct": 0, "ze2_pct": 0}, - "scoring": {"total": 0, "top1_pct": 0, "top3_pct": 0, "top5_pct": 0, "ze2_pct": 0}, + "canalturf": { + "total": 0, + "top1_pct": 0, + "top3_pct": 0, + "top5_pct": 0, + "ze2_pct": 0, + }, + "scoring": { + "total": 0, + "top1_pct": 0, + "top3_pct": 0, + "top5_pct": 0, + "ze2_pct": 0, + }, } for source in ["canalturf", "scoring"]: pred_table = "predictions" if source == "canalturf" else "scoring" - pred_col = "predicted_1" if source == "canalturf" else "horse_number" + pred_col = "predicted_1" if source == "canalturf" else "horse_number" try: cursor.execute( f""" @@ -3566,16 +3577,16 @@ def api_predictions_analysis(): top1_hit = top3_hit = 0 total = len(races) for race, data in races.items(): - actual = set(data["actual"][:3]) - pred_top1 = data["predicted"][0] if data["predicted"] else None - actual_top1 = data["actual"][0] if data["actual"] else None + actual = set(data["actual"][:3]) + pred_top1 = data["predicted"][0] if data["predicted"] else None + actual_top1 = data["actual"][0] if data["actual"] else None if pred_top1 and actual_top1 and pred_top1 == actual_top1: top1_hit += 1 if len(set(data["predicted"][:3]) & actual) >= 1: top3_hit += 1 if total > 0: - stats[source]["total"] = total + stats[source]["total"] = total stats[source]["top1_pct"] = round(top1_hit / total * 100, 1) stats[source]["top3_pct"] = round(top3_hit / total * 100, 1) except Exception as e: @@ -3585,5 +3596,219 @@ def api_predictions_analysis(): return jsonify({"stats": stats, "period": {"start": start_date, "end": end_date}}) +# ───────────────────────────────────────────────────────────────────────────── +# /api/v1/predictions — Ensemble model endpoint (Sprint 6-7 ML Upgrade) +# ───────────────────────────────────────────────────────────────────────────── +_predict_v2 = None + + +def _load_predict_v2(): + """Lazy import of predict_v2 module (ensemble model).""" + global _predict_v2 + if _predict_v2 is None: + try: + import importlib.util, sys + + spec = importlib.util.spec_from_file_location( + "predict_v2", "/home/h3r7/turf_saas/predict_v2.py" + ) + mod = importlib.util.module_from_spec(spec) + spec.loader.exec_module(mod) + _predict_v2 = mod + except Exception as e: + import logging + + logging.error(f"[v1/predictions] predict_v2 import failed: {e}") + return _predict_v2 + + +@app.route("/api/v1/predictions", methods=["GET"]) +@app.route("/turf/api/v1/predictions", methods=["GET"]) +def api_v1_predictions(): + """ + Ensemble ML predictions using XGBoost + LightGBM + MLP (Optuna-tuned). + Query params: + - date: YYYY-MM-DD (default: today / latest available) + - reunion: int (default: all) + - course: int (default: all) + """ + import time as _time + + t0 = _time.perf_counter() + + mod = _load_predict_v2() + if mod is None: + # Graceful fallback: redirect to legacy ml_predictions + return jsonify( + { + "error": "Ensemble model not available yet", + "fallback": "/api/ml_predictions", + "message": "Model is still training. Use /api/ml_predictions for legacy XGBoost predictions.", + } + ), 503 + + ensemble = mod.load_ensemble() + if ensemble is None: + return jsonify( + { + "error": "Ensemble model file not found", + "model_path": str(mod.ENSEMBLE_PATH), + "message": "Run train_ensemble.py to generate the model.", + "fallback": "/api/ml_predictions", + } + ), 503 + + date_param = request.args.get("date", None) + reunion_param = request.args.get("reunion", None) + course_param = request.args.get("course", None) + + conn = sqlite3.connect("/home/h3r7/turf_saas/turf.db") + conn.row_factory = sqlite3.Row + + # Determine date to use + if date_param: + date_used = date_param + else: + row = conn.execute( + "SELECT MAX(date_programme) as d FROM pmu_partants" + ).fetchone() + date_used = ( + row["d"] if row and row["d"] else datetime.now().strftime("%Y-%m-%d") + ) + + # Build query + where_clauses = ["p.date_programme = ?"] + params = [date_used] + if reunion_param: + where_clauses.append("p.num_reunion = ?") + params.append(int(reunion_param)) + if course_param: + where_clauses.append("p.num_course = ?") + params.append(int(course_param)) + + query = f""" + SELECT p.*, c.distance, c.discipline, c.specialite, + c.nb_declares_partants, c.montant_prix, c.penetrometre_intitule, + c.libelle as course_libelle, c.libelle_court as hippodrome, + c.heure_depart_str, c.parcours + FROM pmu_partants p + LEFT JOIN pmu_courses c ON p.date_programme = c.date_programme + AND p.num_reunion = c.num_reunion AND p.num_course = c.num_course + WHERE {" AND ".join(where_clauses)} + ORDER BY p.num_reunion, p.num_course, p.num_pmu + """ + rows = conn.execute(query, params).fetchall() + conn.close() + + if not rows: + return jsonify( + { + "date": date_used, + "model_version": mod.get_model_version(), + "predictions": [], + "message": f"No partants found for date {date_used}", + } + ) + + # Convert to list of dicts + partants = [dict(r) for r in rows] + + # Run ensemble prediction + preds = mod.predict_top3(partants, model=ensemble) + + # Group by race + races = {} + for pred in preds: + key = f"R{pred.get('num_reunion', 0)}C{pred.get('num_course', 0)}" + if key not in races: + # Find race metadata from partants + for p in partants: + if p.get("num_reunion") == pred.get("num_reunion") and p.get( + "num_course" + ) == pred.get("num_course"): + races[key] = { + "reunion": pred.get("num_reunion"), + "course": pred.get("num_course"), + "label": key, + "race_name": p.get("course_libelle", ""), + "hippodrome": p.get("hippodrome", ""), + "heure": p.get("heure_depart_str", ""), + "discipline": p.get("discipline", ""), + "distance": p.get("distance", 0), + "horses": [], + } + break + if key in races: + races[key]["horses"].append(pred) + + latency_ms = (_time.perf_counter() - t0) * 1000 + + return jsonify( + { + "date": date_used, + "model_version": mod.get_model_version(), + "latency_ms": round(latency_ms, 1), + "total_horses": len(preds), + "races": list(races.values()), + } + ) + + +@app.route("/api/v1/model/invalidate-cache", methods=["POST"]) +@app.route("/turf/api/v1/model/invalidate-cache", methods=["POST"]) +def api_v1_invalidate_cache(): + """Force reload of ensemble model on next prediction call.""" + mod = _load_predict_v2() + if mod: + mod.invalidate_model_cache() + return jsonify({"status": "ok", "message": "Model cache invalidated"}) + return jsonify({"status": "error", "message": "predict_v2 module not loaded"}), 500 + + +@app.route("/api/v1/model/status", methods=["GET"]) +@app.route("/turf/api/v1/model/status", methods=["GET"]) +def api_v1_model_status(): + """Return ensemble model status and version.""" + import os as _os + from pathlib import Path as _Path + + ensemble_path = _Path("/home/h3r7/turf_saas/models/ensemble_top3.pkl") + benchmark_path = _Path("/home/h3r7/turf_saas/models/benchmark_report.json") + + status = { + "ensemble_available": ensemble_path.exists(), + "ensemble_path": str(ensemble_path), + } + if ensemble_path.exists(): + mtime = _os.path.getmtime(str(ensemble_path)) + status["last_trained"] = datetime.fromtimestamp(mtime).isoformat() + + if benchmark_path.exists(): + try: + with open(benchmark_path) as f: + import json as _json + + report = _json.load(f) + status["benchmark"] = { + "baseline_precision_at3": report.get("baseline", {}).get( + "precision_at3" + ), + "ensemble_precision_at3": report.get("ensemble", {}).get( + "precision_at3" + ), + "delta": report.get("delta_precision_at3"), + "deployed": report.get("deploy"), + "run_date": report.get("run_date"), + } + except Exception: + pass + + mod = _load_predict_v2() + if mod and ensemble_path.exists(): + status["model_version"] = mod.get_model_version() + + return jsonify(status) + + if __name__ == "__main__": app.run(host="0.0.0.0", port=8790, debug=False) diff --git a/predict_v2.py b/predict_v2.py new file mode 100644 index 0000000..0497d65 --- /dev/null +++ b/predict_v2.py @@ -0,0 +1,387 @@ +#!/usr/bin/env python3 +""" +Ensemble prediction module for /api/v1/predictions. + +Loads the trained ensemble model and provides a high-level predict_top3() +function compatible with the existing combined_api.py interface. + +Cache: model is loaded once at import time (or on first call). + Invalidation: reload if models/ensemble_top3.pkl mtime changes. +""" + +import logging +import os +import pickle +import re +import threading +import time +from datetime import datetime +from pathlib import Path +from typing import Optional + +import numpy as np +import pandas as pd +from sklearn.preprocessing import LabelEncoder + +logger = logging.getLogger(__name__) + +MODELS_DIR = Path("/home/h3r7/turf_saas/models") +ENSEMBLE_PATH = MODELS_DIR / "ensemble_top3.pkl" + +# ── Cache ───────────────────────────────────────────────────────────────────── +_model_cache = { + "ensemble": None, + "mtime": None, + "lock": threading.Lock(), +} + +# ── Feature list (must match train_ensemble.py FEATURE_COLS) ───────────────── +FEATURE_COLS = [ + "age", + "sexe_enc", + "nombre_courses", + "nombre_victoires", + "nombre_places", + "tx_victoire", + "tx_place", + "forme_recente", + "tendance_num", + "gains_annee_en_cours", + "cote_direct", + "cote_reference", + "distance", + "nb_partants", + "discipline_enc", + "specialite_enc", + "oeilleres_enc", + "tendance_cote_enc", + "penetrometre_intitule_enc", + "form_1", + "form_2", + "form_3", + "form_4", + "form_5", + "form_weighted", + "form_avg", + "form_best", + "form_worst", + "win_ratio", + "place_ratio", + "implied_prob", + "win_rate_adj", + "place_rate_adj", + "earnings_per_race", + "cote_diff", + "cote_ratio", + "rang_cote", + "ratio_cote_field", + "distance_cat", + "age_win_interact", + "is_favorite", + "poids", + "prize_norm", +] + + +# ── Encoders (built per-prediction batch for live data) ────────────────────── +def _fit_encoder(values, default): + le = LabelEncoder() + unique = list(set(str(v) if v else default for v in values)) + [default] + le.fit(unique) + return le + + +def _safe_transform(le: LabelEncoder, value, default: str): + v = str(value) if value else default + if v not in le.classes_: + v = default + return int(le.transform([v])[0]) + + +# ── Model loading with auto-invalidation ───────────────────────────────────── +def load_ensemble(force: bool = False) -> Optional[object]: + """Load ensemble model, reload if file changed.""" + with _model_cache["lock"]: + if not ENSEMBLE_PATH.exists(): + return None + mtime = ENSEMBLE_PATH.stat().st_mtime + if force or _model_cache["ensemble"] is None or mtime != _model_cache["mtime"]: + try: + with open(ENSEMBLE_PATH, "rb") as f: + _model_cache["ensemble"] = pickle.load(f) + _model_cache["mtime"] = mtime + logger.info(f"[predict_v2] Loaded ensemble model from {ENSEMBLE_PATH}") + except Exception as e: + logger.error(f"[predict_v2] Failed to load ensemble: {e}") + return None + return _model_cache["ensemble"] + + +def invalidate_model_cache(): + """Force reload on next prediction call.""" + with _model_cache["lock"]: + _model_cache["mtime"] = None + + +# ── Feature engineering for live pmu_partants rows ─────────────────────────── +def _parse_musique(musique) -> list: + if not musique or pd.isna(str(musique)): + return [0, 0, 0, 0, 0] + try: + clean = re.sub(r"\(\d+\)", "", str(musique)) + numbers = re.findall(r"\d+", clean) + result = [int(n) for n in numbers[:5]] + result += [0] * (5 - len(result)) + return result[:5] + except Exception: + return [0, 0, 0, 0, 0] + + +def build_feature_df(partants: list) -> pd.DataFrame: + """ + Convert a list of pmu_partants dicts to a feature DataFrame. + + Expected keys (same as pmu_partants columns): + date_programme, num_reunion, num_course, num_pmu, + age, sexe, musique, nombre_courses, nombre_victoires, nombre_places, + gains_annee_en_cours, handicap_poids, oeilleres, cote_direct, + cote_reference, tendance_cote, favoris, tx_victoire, tx_place, + forme_recente, tendance_forme, indicateur_inedit, + distance, discipline, specialite, nb_declares_partants, + montant_prix, penetrometre_intitule + """ + if not partants: + return pd.DataFrame() + + df = pd.DataFrame(partants) + + # ── Categorical encoders fitted on this batch ───────────────────────────── + le_sexe = _fit_encoder(df.get("sexe", ["U"]), "U") + le_oeilleres = _fit_encoder(df.get("oeilleres", ["SANS"]), "SANS") + le_discipline = _fit_encoder(df.get("discipline", ["UNKNOWN"]), "UNKNOWN") + le_specialite = _fit_encoder(df.get("specialite", ["UNKNOWN"]), "UNKNOWN") + le_tendance = _fit_encoder(df.get("tendance_cote", ["STABLE"]), "STABLE") + le_penet = _fit_encoder(df.get("penetrometre_intitule", ["BON"]), "BON") + + df["sexe_enc"] = df["sexe"].apply(lambda v: _safe_transform(le_sexe, v, "U")) + df["oeilleres_enc"] = df["oeilleres"].apply( + lambda v: _safe_transform(le_oeilleres, v, "SANS") + ) + df["discipline_enc"] = df.get("discipline", pd.Series(["UNKNOWN"] * len(df))).apply( + lambda v: _safe_transform(le_discipline, v, "UNKNOWN") + ) + df["specialite_enc"] = df.get("specialite", pd.Series(["UNKNOWN"] * len(df))).apply( + lambda v: _safe_transform(le_specialite, v, "UNKNOWN") + ) + df["tendance_cote_enc"] = df.get( + "tendance_cote", pd.Series(["STABLE"] * len(df)) + ).apply(lambda v: _safe_transform(le_tendance, v, "STABLE")) + df["penetrometre_intitule_enc"] = df.get( + "penetrometre_intitule", pd.Series(["BON"] * len(df)) + ).apply(lambda v: _safe_transform(le_penet, v, "BON")) + + # ── Musique ──────────────────────────────────────────────────────────────── + music_parsed = df["musique"].apply(_parse_musique) + for i in range(5): + df[f"form_{i + 1}"] = music_parsed.apply(lambda x: x[i]) + weights = np.array([0.4, 0.25, 0.15, 0.12, 0.08]) + df["form_weighted"] = music_parsed.apply( + lambda x: sum(w * v for w, v in zip(weights, x)) + ) + df["form_avg"] = music_parsed.apply(np.mean) + df["form_best"] = music_parsed.apply(min) + df["form_worst"] = music_parsed.apply(max) + + # ── Numeric features ─────────────────────────────────────────────────────── + for col in [ + "nombre_courses", + "nombre_victoires", + "nombre_places", + "tx_victoire", + "tx_place", + "forme_recente", + "tendance_forme", + "gains_annee_en_cours", + "cote_direct", + "cote_reference", + "distance", + "handicap_poids", + "age", + "montant_prix", + "nb_declares_partants", + ]: + if col not in df.columns: + df[col] = 0.0 + df[col] = pd.to_numeric(df[col], errors="coerce").fillna(0) + + df["tendance_num"] = df["tendance_forme"].fillna(0) + df["win_ratio"] = df["nombre_victoires"] / df["nombre_courses"].replace(0, 1) + df["place_ratio"] = df["nombre_places"] / df["nombre_courses"].replace(0, 1) + df["implied_prob"] = 1.0 / df["cote_direct"].replace(0, np.nan) + df["win_rate_adj"] = df["tx_victoire"] * np.log1p(df["nombre_courses"]) + df["place_rate_adj"] = df["tx_place"] * np.log1p(df["nombre_courses"]) + df["earnings_per_race"] = df["gains_annee_en_cours"] / df["nombre_courses"].replace( + 0, 1 + ) + df["cote_diff"] = (df["cote_direct"] - df["cote_reference"]).fillna(0) + df["cote_ratio"] = ( + df["cote_direct"] / df["cote_reference"].replace(0, np.nan) + ).fillna(1) + + # ── Per-race rank features ───────────────────────────────────────────────── + if "num_reunion" in df.columns and "num_course" in df.columns: + grp = ["date_programme", "num_reunion", "num_course"] + # Some fields may be missing + for g in grp: + if g not in df.columns: + df[g] = 0 + df["rang_cote"] = df.groupby(grp)["cote_direct"].rank( + method="min", na_option="bottom" + ) + race_mean = df.groupby(grp)["cote_direct"].transform("mean") + df["ratio_cote_field"] = df["cote_direct"] / race_mean.replace(0, np.nan) + df["nb_partants"] = df.groupby(grp)["cote_direct"].transform("count") + else: + df["rang_cote"] = 1.0 + df["ratio_cote_field"] = 1.0 + df["nb_partants"] = df.get("nb_declares_partants", pd.Series([10] * len(df))) + + df["distance_cat"] = pd.cut( + df["distance"].fillna(1600), + bins=[0, 1400, 1800, 2200, 2600, 10000], + labels=[1, 2, 3, 4, 5], + ).astype(float) + df["age_win_interact"] = df["age"] * df["tx_victoire"] + df["is_favorite"] = ( + df.get("favoris", pd.Series([0] * len(df))).fillna(0).astype(int) + ) + df["poids"] = df["handicap_poids"].fillna(60) + df["prize_norm"] = np.log1p(df["montant_prix"].fillna(0)) + + return df + + +# ── Main prediction function ─────────────────────────────────────────────────── +def predict_top3(partants: list, model=None) -> list: + """ + Given a list of partant dicts (from pmu_partants), return predictions. + + Returns list of {horse_name, num_pmu, prob_top3, prob_top1_approx, ...} + sorted by prob_top3 descending. + + Falls back to empty list if model not available. + """ + t_start = time.perf_counter() + + if model is None: + model = load_ensemble() + if model is None: + logger.warning("[predict_v2] Ensemble model not available — no predictions") + return [] + + df = build_feature_df(partants) + if df.empty: + return [] + + available = [c for c in FEATURE_COLS if c in df.columns] + X = df[available].fillna(0) + + try: + proba = model.predict_proba(X)[:, 1] + except Exception as e: + logger.error(f"[predict_v2] predict_proba failed: {e}") + return [] + + latency_ms = (time.perf_counter() - t_start) * 1000 + + results = [] + for i, (p, row) in enumerate(zip(proba, partants)): + results.append( + { + "horse_name": row.get("nom", row.get("horse_name", f"H{i}")), + "num_pmu": row.get("num_pmu", i + 1), + "num_reunion": row.get("num_reunion"), + "num_course": row.get("num_course"), + "prob_top3": round(float(p) * 100, 1), + # approx top1 from top3 score (divide by ~2.5 empirically) + "prob_top1": round(float(p) / 2.5 * 100, 1), + "ml_score": round(float(p) * 100, 1), + "recommendation": "top3" + if p >= 0.40 + else ("watch" if p >= 0.28 else "pass"), + "is_value_bet": int( + p >= 0.35 and float(row.get("cote_direct", 0) or 0) > 10 + ), + "model_version": getattr(model, "version", "ensemble_v1"), + } + ) + + results.sort(key=lambda x: x["prob_top3"], reverse=True) + + # Mark top-3 predicted + for i, r in enumerate(results[:3]): + r["predicted_rank"] = i + 1 + + if results: + logger.info( + f"[predict_v2] {len(results)} horses predicted in {latency_ms:.1f} ms " + f"({latency_ms / len(results):.2f} ms/horse)" + ) + + return results + + +# ── API-compatible wrapper keeping model_version & structure ────────────────── +def get_model_version() -> str: + m = load_ensemble() + if m is None: + return "ensemble_v1_not_loaded" + return getattr(m, "version", "ensemble_v1") + + +if __name__ == "__main__": + # Quick self-test + import sqlite3 + + conn = sqlite3.connect("/home/h3r7/turf_saas/turf.db") + rows = conn.execute( + """SELECT p.*, c.distance, c.discipline, c.specialite, + c.nb_declares_partants, c.montant_prix, c.penetrometre_intitule + FROM pmu_partants p + LEFT JOIN pmu_courses c ON p.date_programme=c.date_programme + AND p.num_reunion=c.num_reunion AND p.num_course=c.num_course + WHERE p.date_programme=(SELECT MAX(date_programme) FROM pmu_partants) + AND p.num_reunion=1 AND p.num_course=1 + LIMIT 20""" + ).fetchall() + conn.close() + + if not rows: + print("No data found for self-test") + else: + cols = [d[0] for d in conn.description] if hasattr(conn, "description") else [] + # Fallback column list + import sqlite3 as sq3 + + conn2 = sq3.connect("/home/h3r7/turf_saas/turf.db") + cur = conn2.execute( + """SELECT p.*, c.distance, c.discipline, c.specialite, + c.nb_declares_partants, c.montant_prix, c.penetrometre_intitule + FROM pmu_partants p + LEFT JOIN pmu_courses c ON p.date_programme=c.date_programme + AND p.num_reunion=c.num_reunion AND p.num_course=c.num_course + WHERE p.date_programme=(SELECT MAX(date_programme) FROM pmu_partants) + AND p.num_reunion=1 AND p.num_course=1 + LIMIT 20""" + ) + cols = [d[0] for d in cur.description] + rows2 = cur.fetchall() + conn2.close() + + partants = [dict(zip(cols, row)) for row in rows2] + preds = predict_top3(partants) + print(f"Self-test: {len(preds)} predictions") + for p in preds[:5]: + print( + f" {p['horse_name']:20s} prob_top3={p['prob_top3']}% rec={p['recommendation']}" + ) diff --git a/tests/test_ml_ensemble.py b/tests/test_ml_ensemble.py new file mode 100644 index 0000000..d40bbd7 --- /dev/null +++ b/tests/test_ml_ensemble.py @@ -0,0 +1,333 @@ +""" +Tests ML Ensemble — HRT-32 Sprint 6-7 +Tests de régression, benchmark et latence pour le nouveau modèle ensemble. + +Usage: + pytest tests/test_ml_ensemble.py -v + pytest tests/test_ml_ensemble.py -v -m regression + pytest tests/test_ml_ensemble.py -v -m latency +""" + +import json +import os +import pickle +import sqlite3 +import time +from pathlib import Path + +import numpy as np +import pandas as pd +import pytest +import requests + +BASE_URL = os.environ.get("APP_URL", "http://localhost:8790") +DB_PATH = os.environ.get("DB_PATH", "/home/h3r7/turf_saas/turf.db") +MODELS_DIR = Path("/home/h3r7/turf_saas/models") +ENSEMBLE_PATH = MODELS_DIR / "ensemble_top3.pkl" +BENCHMARK_PATH = MODELS_DIR / "benchmark_report.json" + + +# ─── Fixtures ──────────────────────────────────────────────────────────────── + + +@pytest.fixture(scope="session") +def ensemble_model(): + """Load ensemble model (skip tests if not yet trained).""" + if not ENSEMBLE_PATH.exists(): + pytest.skip( + f"Ensemble model not found at {ENSEMBLE_PATH}. Run train_ensemble.py first." + ) + with open(ENSEMBLE_PATH, "rb") as f: + return pickle.load(f) + + +@pytest.fixture(scope="session") +def benchmark_report(): + """Load benchmark report (skip if not generated).""" + if not BENCHMARK_PATH.exists(): + pytest.skip(f"Benchmark report not found at {BENCHMARK_PATH}.") + with open(BENCHMARK_PATH) as f: + return json.load(f) + + +@pytest.fixture(scope="session") +def holdout_data(): + """Load holdout slice (last 20% temporal) for regression tests.""" + conn = sqlite3.connect(DB_PATH) + df = pd.read_sql_query( + """ + SELECT p.*, c.distance, c.discipline, c.specialite, + c.nb_declares_partants, c.montant_prix, c.penetrometre_intitule + FROM pmu_partants p + LEFT JOIN pmu_courses c ON p.date_programme=c.date_programme + AND p.num_reunion=c.num_reunion AND p.num_course=c.num_course + WHERE p.ordre_arrivee > 0 + ORDER BY p.date_programme, p.num_reunion, p.num_course, p.num_pmu + """, + conn, + ) + conn.close() + n = len(df) + cutoff = int(n * 0.80) + return df.iloc[cutoff:].copy() + + +@pytest.fixture(scope="session") +def predict_v2(): + """Import predict_v2 module.""" + import importlib.util + + spec = importlib.util.spec_from_file_location( + "predict_v2", "/home/h3r7/turf_saas/predict_v2.py" + ) + mod = importlib.util.module_from_spec(spec) + spec.loader.exec_module(mod) + return mod + + +# ─── Model Existence Tests ──────────────────────────────────────────────────── + + +class TestModelFiles: + """Verify all expected model files exist.""" + + def test_ensemble_model_exists(self): + assert ENSEMBLE_PATH.exists(), f"Ensemble model missing: {ENSEMBLE_PATH}" + + def test_benchmark_report_exists(self): + assert BENCHMARK_PATH.exists(), f"Benchmark report missing: {BENCHMARK_PATH}" + + def test_models_dir_contains_expected_files(self): + expected = ["ensemble_top3.pkl", "benchmark_report.json", "benchmark_report.md"] + for fname in expected: + assert (MODELS_DIR / fname).exists(), f"Missing: {MODELS_DIR / fname}" + + +# ─── Benchmark Tests ────────────────────────────────────────────────────────── + + +class TestBenchmark: + """Validate benchmark metrics from the training report.""" + + @pytest.mark.regression + def test_ensemble_beats_baseline_or_meets_threshold(self, benchmark_report): + """Ensemble Precision@3 must be >= baseline XGBoost.""" + baseline = benchmark_report["baseline"]["precision_at3"] + ensemble = benchmark_report["ensemble"]["precision_at3"] + assert ensemble >= baseline, ( + f"Ensemble Precision@3 {ensemble:.4f} < baseline {baseline:.4f}" + ) + + @pytest.mark.regression + def test_ensemble_auc_above_random(self, benchmark_report): + """Ensemble AUC must be > 0.60 (significantly above random 0.50).""" + auc = benchmark_report["ensemble"]["auc"] + assert auc > 0.60, f"Ensemble AUC {auc:.4f} <= 0.60" + + @pytest.mark.regression + def test_optuna_ran_minimum_trials(self, benchmark_report): + """Optuna must have run at least 100 trials per model.""" + n_trials = benchmark_report["optuna"]["n_trials"] + assert n_trials >= 100, f"Only {n_trials} Optuna trials (minimum 100 required)" + + @pytest.mark.regression + def test_no_precision_regression(self, benchmark_report): + """Ensemble Precision@3 must not be below naive random baseline (~30%).""" + ensemble_p3 = benchmark_report["ensemble"]["precision_at3"] + assert ensemble_p3 >= 0.30, ( + f"Precision@3 {ensemble_p3:.4f} is below random baseline (~0.30)" + ) + + def test_benchmark_has_all_required_models(self, benchmark_report): + """Benchmark must include results for all 3 models.""" + required = {"xgboost", "lightgbm", "mlp"} + found = set(benchmark_report.get("individual_models", {}).keys()) + missing = required - found + assert not missing, f"Missing model benchmarks: {missing}" + + +# ─── Regression Tests ───────────────────────────────────────────────────────── + + +class TestPrecisionRegression: + """Holdout regression: ensure precision doesn't degrade.""" + + @pytest.mark.regression + def test_precision_at3_on_holdout(self, ensemble_model, holdout_data): + """Precision@3 on holdout must be above naive baseline.""" + from predict_v2 import build_feature_df, FEATURE_COLS + + df = holdout_data.copy() + df["top3"] = (df["ordre_arrivee"] <= 3).astype(int) + + partants = df.to_dict("records") + feature_df = build_feature_df(partants) + available = [c for c in FEATURE_COLS if c in feature_df.columns] + X = feature_df[available].fillna(0) + + proba = ensemble_model.predict_proba(X)[:, 1] + + # Per-race Precision@3 + tmp = df[["date_programme", "num_reunion", "num_course"]].copy() + tmp["proba"] = proba + tmp["actual"] = df["top3"].values + + precisions = [] + for _, group in tmp.groupby(["date_programme", "num_reunion", "num_course"]): + if len(group) >= 3: + top3_pred = group.nlargest(3, "proba") + precisions.append(top3_pred["actual"].sum() / 3.0) + + p_at3 = float(np.mean(precisions)) if precisions else 0.0 + print(f"\n Holdout Precision@3: {p_at3:.4f} over {len(precisions)} races") + + # Must beat random baseline (30%) + assert p_at3 >= 0.30, f"Holdout Precision@3 {p_at3:.4f} < 0.30" + + @pytest.mark.regression + def test_no_all_zero_predictions(self, ensemble_model, holdout_data): + """Ensemble must not predict 0 probability for all horses.""" + from predict_v2 import build_feature_df, FEATURE_COLS + + partants = holdout_data.head(50).to_dict("records") + feature_df = build_feature_df(partants) + available = [c for c in FEATURE_COLS if c in feature_df.columns] + X = feature_df[available].fillna(0) + + proba = ensemble_model.predict_proba(X)[:, 1] + assert proba.max() > 0.01, "All predictions are near 0 — model appears broken" + assert proba.std() > 0.01, ( + "All predictions have identical probability — no discrimination" + ) + + +# ─── Latency Tests ──────────────────────────────────────────────────────────── + + +class TestPredictionLatency: + """Prediction latency must be < 200ms per race.""" + + @pytest.mark.latency + def test_single_race_latency(self, ensemble_model, holdout_data): + """Prediction for a single race (<=20 horses) must be < 200ms.""" + from predict_v2 import build_feature_df, FEATURE_COLS + + # Take one race + first_race = ( + holdout_data.groupby(["date_programme", "num_reunion", "num_course"]) + .first() + .reset_index() + .iloc[0] + ) + mask = ( + (holdout_data["date_programme"] == first_race["date_programme"]) + & (holdout_data["num_reunion"] == first_race["num_reunion"]) + & (holdout_data["num_course"] == first_race["num_course"]) + ) + race_df = holdout_data[mask] + partants = race_df.to_dict("records") + + # Warm-up + feature_df = build_feature_df(partants) + available = [c for c in FEATURE_COLS if c in feature_df.columns] + X = feature_df[available].fillna(0) + ensemble_model.predict_proba(X) + + # Timed run + t0 = time.perf_counter() + for _ in range(10): + ensemble_model.predict_proba(X) + elapsed_ms = (time.perf_counter() - t0) / 10 * 1000 + + print(f"\n Single-race latency: {elapsed_ms:.2f} ms ({len(partants)} horses)") + assert elapsed_ms < 200, ( + f"Prediction latency {elapsed_ms:.1f} ms exceeds 200 ms limit" + ) + + @pytest.mark.latency + def test_full_day_latency(self, ensemble_model, holdout_data): + """Prediction for a full day (all races) must complete < 5 seconds.""" + from predict_v2 import build_feature_df, FEATURE_COLS + + # Take one day + day = holdout_data["date_programme"].iloc[0] + day_df = holdout_data[holdout_data["date_programme"] == day] + partants = day_df.to_dict("records") + + feature_df = build_feature_df(partants) + available = [c for c in FEATURE_COLS if c in feature_df.columns] + X = feature_df[available].fillna(0) + + t0 = time.perf_counter() + proba = ensemble_model.predict_proba(X) + elapsed_ms = (time.perf_counter() - t0) * 1000 + + print( + f"\n Full day latency: {elapsed_ms:.2f} ms ({len(partants)} horses, {day})" + ) + assert elapsed_ms < 5000, ( + f"Full-day prediction {elapsed_ms:.0f} ms exceeds 5s limit" + ) + + +# ─── API Endpoint Tests ─────────────────────────────────────────────────────── + + +class TestV1PredictionsAPI: + """Tests for the new /api/v1/predictions endpoint.""" + + def _api_available(self): + try: + requests.get(f"{BASE_URL}/api/v1/model/status", timeout=3) + return True + except Exception: + return False + + @pytest.mark.api + def test_model_status_endpoint(self): + """GET /api/v1/model/status returns valid JSON.""" + if not self._api_available(): + pytest.skip("API server not running") + resp = requests.get(f"{BASE_URL}/api/v1/model/status", timeout=10) + assert resp.status_code == 200 + data = resp.json() + assert "ensemble_available" in data + + @pytest.mark.api + def test_v1_predictions_no_500(self): + """GET /api/v1/predictions must not return 5xx.""" + if not self._api_available(): + pytest.skip("API server not running") + resp = requests.get(f"{BASE_URL}/api/v1/predictions", timeout=30) + assert resp.status_code < 500, ( + f"Server error: {resp.status_code}\n{resp.text[:200]}" + ) + + @pytest.mark.api + def test_v1_predictions_returns_json(self): + """GET /api/v1/predictions returns valid JSON with expected keys.""" + if not self._api_available(): + pytest.skip("API server not running") + resp = requests.get(f"{BASE_URL}/api/v1/predictions", timeout=30) + if resp.status_code == 503: + pytest.skip("Ensemble model not yet deployed") + assert resp.status_code == 200 + data = resp.json() + assert "model_version" in data, "Missing model_version in response" + assert "races" in data or "predictions" in data, ( + "Missing races/predictions in response" + ) + + @pytest.mark.api + def test_v1_predictions_latency(self): + """GET /api/v1/predictions must respond in < 3 seconds.""" + if not self._api_available(): + pytest.skip("API server not running") + resp = requests.get(f"{BASE_URL}/api/v1/predictions", timeout=30) + if resp.status_code == 503: + pytest.skip("Ensemble model not yet deployed") + # Check API-reported latency + if resp.status_code == 200: + data = resp.json() + latency = data.get("latency_ms", 0) + assert latency < 3000, f"API latency {latency:.0f} ms > 3000 ms" diff --git a/train_ensemble.py b/train_ensemble.py new file mode 100644 index 0000000..10f9a98 --- /dev/null +++ b/train_ensemble.py @@ -0,0 +1,1007 @@ +#!/usr/bin/env python3 +""" +Ensemble ML Training for Turf Predictions — Sprint 6-7 +XGBoost + LightGBM + MLP with Optuna hyperparameter optimization. + +Deliverables: + - Ensemble model (voting) serialized to models/ensemble_top3.pkl + - Benchmark report: baseline XGBoost vs optimized ensemble + - Precision TOP3 must improve by +5% minimum to deploy + +Usage: + python train_ensemble.py [--trials 100] [--db /path/to/turf.db] [--quick] +""" + +import argparse +import json +import os +import pickle +import re +import sqlite3 +import time +import warnings +from datetime import datetime +from pathlib import Path + +import numpy as np +import optuna +import pandas as pd +import shap +from sklearn.metrics import ( + accuracy_score, + classification_report, + precision_score, + recall_score, + roc_auc_score, +) +from sklearn.model_selection import StratifiedKFold +from sklearn.neural_network import MLPClassifier +from sklearn.pipeline import Pipeline +from sklearn.preprocessing import LabelEncoder, StandardScaler + +import lightgbm as lgb +import xgboost as xgb + +warnings.filterwarnings("ignore") +optuna.logging.set_verbosity(optuna.logging.WARNING) + +# ─── Paths ────────────────────────────────────────────────────────────────── +DB_PATH = os.environ.get("DB_PATH", "/home/h3r7/turf_saas/turf.db") +MODELS_DIR = Path("/home/h3r7/turf_saas/models") +MODELS_DIR.mkdir(exist_ok=True) + +HOLDOUT_FRACTION = 0.20 # 20 % temporal holdout +DEPLOY_THRESHOLD = 0.05 # +5 % Precision@3 to auto-deploy +MIN_TRIALS = 100 + + +# ───────────────────────────────────────────────────────────────────────────── +# 1. DATA LOADING & FEATURE ENGINEERING +# ───────────────────────────────────────────────────────────────────────────── + + +def load_data(db_path: str) -> pd.DataFrame: + """Load joined partants + courses data from SQLite.""" + conn = sqlite3.connect(db_path) + query = """ + SELECT + p.date_programme, + p.num_reunion, + p.num_course, + p.num_pmu, + p.age, + p.sexe, + p.musique, + p.nombre_courses, + p.nombre_victoires, + p.nombre_places, + p.nombre_places_2eme, + p.nombre_places_3eme, + p.gains_carriere, + p.gains_annee_en_cours, + p.gains_victoires, + p.handicap_poids, + p.oeilleres, + p.cote_direct, + p.cote_reference, + p.tendance_cote, + p.favoris, + p.ordre_arrivee, + p.tx_victoire, + p.tx_place, + p.forme_recente, + p.tendance_forme, + p.indicateur_inedit, + c.distance, + c.discipline, + c.specialite, + c.nb_declares_partants, + c.montant_prix, + c.penetrometre_intitule + FROM pmu_partants p + LEFT JOIN pmu_courses c + ON p.date_programme = c.date_programme + AND p.num_reunion = c.num_reunion + AND p.num_course = c.num_course + WHERE p.ordre_arrivee > 0 + ORDER BY p.date_programme, p.num_reunion, p.num_course, p.num_pmu + """ + df = pd.read_sql_query(query, conn) + conn.close() + print(f" Loaded {len(df):,} rows from database.") + return df + + +def parse_musique(musique): + """Parse PMU musique string → list of 5 recent positions.""" + if not musique or pd.isna(musique): + return [0, 0, 0, 0, 0] + try: + clean = re.sub(r"\(\d+\)", "", str(musique)) + numbers = re.findall(r"\d+", clean) + result = [int(n) if n else 0 for n in numbers[:5]] + # Pad to length 5 + result += [0] * (5 - len(result)) + return result[:5] + except Exception: + return [0, 0, 0, 0, 0] + + +def engineer_features(df: pd.DataFrame) -> pd.DataFrame: + """Enhanced feature engineering including new candidate features.""" + df = df.copy() + + # ── Cible ──────────────────────────────────────────────────────────────── + df["top3"] = (df["ordre_arrivee"] <= 3).astype(int) + df["top1"] = (df["ordre_arrivee"] == 1).astype(int) + + # ── Encodages catégoriels ──────────────────────────────────────────────── + for col, default in [ + ("sexe", "U"), + ("oeilleres", "SANS"), + ("discipline", "UNKNOWN"), + ("specialite", "UNKNOWN"), + ("tendance_cote", "STABLE"), + ("penetrometre_intitule", "BON"), + ]: + le = LabelEncoder() + df[f"{col}_enc"] = le.fit_transform(df[col].fillna(default)) + + # ── Musique (5 dernières positions) ───────────────────────────────────── + music_parsed = df["musique"].apply(parse_musique) + for i in range(5): + df[f"form_{i + 1}"] = music_parsed.apply(lambda x: x[i]) + + # Weighted recent form (exponential decay — most recent weighs most) + weights = np.array([0.4, 0.25, 0.15, 0.12, 0.08]) + df["form_weighted"] = music_parsed.apply( + lambda x: sum(w * v for w, v in zip(weights, x)) + ) + df["form_avg"] = music_parsed.apply(np.mean) + df["form_best"] = music_parsed.apply(min) # best = lowest position + df["form_worst"] = music_parsed.apply(max) + + # ── Forme récente (5 dernières courses) — NEW ───────────────────────── + # Ratio victoires/courses (carrière) + df["win_ratio"] = df["nombre_victoires"] / df["nombre_courses"].replace(0, 1) + # Ratio places/courses + df["place_ratio"] = df["nombre_places"] / df["nombre_courses"].replace(0, 1) + # Probabilité implicite cote + df["implied_prob"] = 1.0 / df["cote_direct"].replace(0, np.nan) + + # Adj win rate (log experience) + df["win_rate_adj"] = df["tx_victoire"] * np.log1p(df["nombre_courses"]) + df["place_rate_adj"] = df["tx_place"] * np.log1p(df["nombre_courses"]) + + # Earnings per race — NEW + df["earnings_per_race"] = df["gains_annee_en_cours"] / df["nombre_courses"].replace( + 0, 1 + ) + + # Cote historique moyenne approx — use cote_reference vs cote_direct + df["cote_diff"] = (df["cote_direct"] - df["cote_reference"]).fillna(0) + df["cote_ratio"] = ( + df["cote_direct"] / df["cote_reference"].replace(0, np.nan) + ).fillna(1) + + # Rang cote dans le champ (per-race rank) + df["rang_cote"] = df.groupby(["date_programme", "num_reunion", "num_course"])[ + "cote_direct" + ].rank(method="min", na_option="bottom") + # Ratio cote vs field mean + race_mean_cote = df.groupby(["date_programme", "num_reunion", "num_course"])[ + "cote_direct" + ].transform("mean") + df["ratio_cote_field"] = df["cote_direct"] / race_mean_cote.replace(0, np.nan) + + # Field strength (number of starters) + df["nb_partants"] = df["nb_declares_partants"].fillna( + df.groupby(["date_programme", "num_reunion", "num_course"])[ + "num_pmu" + ].transform("count") + ) + + # Distance categories + df["distance_cat"] = pd.cut( + df["distance"].fillna(1600), + bins=[0, 1400, 1800, 2200, 2600, 10000], + labels=[1, 2, 3, 4, 5], + ).astype(float) + + # Age × winrate interaction + df["age_win_interact"] = df["age"] * df["tx_victoire"] + + # Favoris + df["is_favorite"] = df["favoris"].fillna(0).astype(int) + + # Poids + df["poids"] = df["handicap_poids"].fillna(df["handicap_poids"].median()) + + # Prize money normalised + df["prize_norm"] = np.log1p(df["montant_prix"].fillna(0)) + + # Tendency indicator + df["tendance_num"] = df["tendance_forme"].fillna(0) + + return df + + +FEATURE_COLS = [ + "age", + "sexe_enc", + "nombre_courses", + "nombre_victoires", + "nombre_places", + "tx_victoire", + "tx_place", + "forme_recente", + "tendance_num", + "gains_annee_en_cours", + "cote_direct", + "cote_reference", + "distance", + "nb_partants", + "discipline_enc", + "specialite_enc", + "oeilleres_enc", + "tendance_cote_enc", + "penetrometre_intitule_enc", + "form_1", + "form_2", + "form_3", + "form_4", + "form_5", + "form_weighted", + "form_avg", + "form_best", + "form_worst", + "win_ratio", + "place_ratio", + "implied_prob", + "win_rate_adj", + "place_rate_adj", + "earnings_per_race", + "cote_diff", + "cote_ratio", + "rang_cote", + "ratio_cote_field", + "distance_cat", + "age_win_interact", + "is_favorite", + "poids", + "prize_norm", +] + + +def get_features_and_target(df: pd.DataFrame, target: str = "top3"): + """Return X, y filtered to available feature columns.""" + available = [c for c in FEATURE_COLS if c in df.columns] + X = df[available].fillna(0) + y = df[target].fillna(0).astype(int) + return X, y, available + + +# ───────────────────────────────────────────────────────────────────────────── +# 2. TEMPORAL TRAIN/HOLDOUT SPLIT +# ───────────────────────────────────────────────────────────────────────────── + + +def temporal_split(df: pd.DataFrame, holdout_frac: float = 0.20): + """Split data chronologically — no leakage.""" + df = df.sort_values("date_programme") + cutoff_idx = int(len(df) * (1 - holdout_frac)) + train = df.iloc[:cutoff_idx].copy() + holdout = df.iloc[cutoff_idx:].copy() + print( + f" Train: {len(train):,} rows ({train['date_programme'].min()} → {train['date_programme'].max()})" + ) + print( + f" Holdout: {len(holdout):,} rows ({holdout['date_programme'].min()} → {holdout['date_programme'].max()})" + ) + return train, holdout + + +# ───────────────────────────────────────────────────────────────────────────── +# 3. BASELINE XGBOOST (existing model score) +# ───────────────────────────────────────────────────────────────────────────── + + +def evaluate_baseline(holdout_df: pd.DataFrame, existing_model_path: str) -> dict: + """Load existing XGBoost model and compute Precision@3 on holdout. + + The old model was trained on historical_data with different column names; + we map the equivalent pmu_partants columns before prediction. + """ + # Column name mapping: old_name -> new_name in holdout_df + COL_MAP = { + "nb_courses": "nombre_courses", + "nb_victoires": "nombre_victoires", + "nb_places": "nombre_places", + "gains_annee": "gains_annee_en_cours", + "cote_directe": "cote_direct", + "avis_enc": None, # not available → 0 + "deferre_enc": None, # not available → 0 + "reduction_km": None, # not available → 0 + "victories_per_race": None, + "places_per_race": None, + } + try: + with open(existing_model_path, "rb") as f: + saved = pickle.load(f) + model = saved["model_top3"] + feat_cols = saved["feature_cols"] + + mapped = holdout_df.copy() + for old, new in COL_MAP.items(): + if new and new in mapped.columns: + mapped[old] = mapped[new] + elif old not in mapped.columns: + mapped[old] = 0 + + available = [c for c in feat_cols if c in mapped.columns] + X_h = mapped[available].fillna(0) + y_h = holdout_df["top3"].fillna(0).astype(int) + proba = model.predict_proba(X_h)[:, 1] + precision_at3 = compute_precision_at3(proba, y_h, holdout_df) + auc = roc_auc_score(y_h, proba) + return { + "model": "XGBoost (baseline)", + "precision_at3": precision_at3, + "auc": auc, + } + except Exception as e: + print(f" [WARN] Could not load baseline model: {e}") + # Compute a proper random baseline for comparison purposes + y_h = holdout_df["top3"].fillna(0).astype(int) + p3 = float(y_h.mean()) # random / naive baseline + return { + "model": "XGBoost (baseline — fallback naive)", + "precision_at3": round(p3, 4), + "auc": 0.5, + } + + +def compute_precision_at3(proba, y_true, df: pd.DataFrame) -> float: + """ + Per-race Precision@3: for each race, take top-3 predicted horses, + count fraction that are truly in top3. Average across races. + """ + tmp = df[["date_programme", "num_reunion", "num_course"]].copy() + tmp["proba"] = proba + tmp["actual"] = y_true.values + + precisions = [] + for _, group in tmp.groupby(["date_programme", "num_reunion", "num_course"]): + if len(group) < 3: + continue + top3_pred = group.nlargest(3, "proba") + prec = top3_pred["actual"].sum() / 3.0 + precisions.append(prec) + + return float(np.mean(precisions)) if precisions else 0.0 + + +# ───────────────────────────────────────────────────────────────────────────── +# 4. OPTUNA HYPERPARAMETER OPTIMIZATION +# ───────────────────────────────────────────────────────────────────────────── + + +def optuna_xgboost(X_train, y_train, n_trials: int = MIN_TRIALS) -> dict: + """Optuna study for XGBoost hyperparameters.""" + print(f"\n [Optuna] XGBoost — {n_trials} trials …") + scale_pos = float((len(y_train) - y_train.sum()) / max(y_train.sum(), 1)) + + def objective(trial): + params = { + "objective": "binary:logistic", + "eval_metric": "auc", + "verbosity": 0, + "random_state": 42, + "scale_pos_weight": scale_pos, + "n_estimators": trial.suggest_int("n_estimators", 50, 400), + "max_depth": trial.suggest_int("max_depth", 3, 10), + "learning_rate": trial.suggest_float("learning_rate", 0.01, 0.3, log=True), + "subsample": trial.suggest_float("subsample", 0.5, 1.0), + "colsample_bytree": trial.suggest_float("colsample_bytree", 0.4, 1.0), + "min_child_weight": trial.suggest_int("min_child_weight", 1, 20), + "reg_alpha": trial.suggest_float("reg_alpha", 1e-4, 10.0, log=True), + "reg_lambda": trial.suggest_float("reg_lambda", 1e-4, 10.0, log=True), + "gamma": trial.suggest_float("gamma", 0, 5), + } + model = xgb.XGBClassifier(**params) + cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42) + scores = [] + for tr_idx, val_idx in cv.split(X_train, y_train): + X_tr, X_val = X_train.iloc[tr_idx], X_train.iloc[val_idx] + y_tr, y_val = y_train.iloc[tr_idx], y_train.iloc[val_idx] + model.fit(X_tr, y_tr, eval_set=[(X_val, y_val)], verbose=False) + prob = model.predict_proba(X_val)[:, 1] + scores.append(roc_auc_score(y_val, prob)) + return float(np.mean(scores)) + + study = optuna.create_study( + direction="maximize", + pruner=optuna.pruners.MedianPruner(n_startup_trials=10, n_warmup_steps=0), + sampler=optuna.samplers.TPESampler(seed=42), + ) + study.optimize(objective, n_trials=n_trials, show_progress_bar=False) + print(f" Best AUC: {study.best_value:.4f} params: {study.best_params}") + return study.best_params + + +def optuna_lightgbm(X_train, y_train, n_trials: int = MIN_TRIALS) -> dict: + """Optuna study for LightGBM hyperparameters.""" + print(f"\n [Optuna] LightGBM — {n_trials} trials …") + scale_pos = float((len(y_train) - y_train.sum()) / max(y_train.sum(), 1)) + + def objective(trial): + params = { + "objective": "binary", + "metric": "auc", + "verbose": -1, + "random_state": 42, + "is_unbalance": False, + "scale_pos_weight": scale_pos, + "n_estimators": trial.suggest_int("n_estimators", 50, 400), + "max_depth": trial.suggest_int("max_depth", 3, 12), + "learning_rate": trial.suggest_float("learning_rate", 0.01, 0.3, log=True), + "num_leaves": trial.suggest_int("num_leaves", 15, 150), + "subsample": trial.suggest_float("subsample", 0.5, 1.0), + "colsample_bytree": trial.suggest_float("colsample_bytree", 0.4, 1.0), + "min_child_samples": trial.suggest_int("min_child_samples", 5, 50), + "reg_alpha": trial.suggest_float("reg_alpha", 1e-4, 10.0, log=True), + "reg_lambda": trial.suggest_float("reg_lambda", 1e-4, 10.0, log=True), + } + model = lgb.LGBMClassifier(**params) + cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42) + scores = [] + for tr_idx, val_idx in cv.split(X_train, y_train): + X_tr, X_val = X_train.iloc[tr_idx], X_train.iloc[val_idx] + y_tr, y_val = y_train.iloc[tr_idx], y_train.iloc[val_idx] + model.fit(X_tr, y_tr, eval_set=[(X_val, y_val)]) + prob = model.predict_proba(X_val)[:, 1] + scores.append(roc_auc_score(y_val, prob)) + return float(np.mean(scores)) + + study = optuna.create_study( + direction="maximize", + pruner=optuna.pruners.MedianPruner(n_startup_trials=10, n_warmup_steps=0), + sampler=optuna.samplers.TPESampler(seed=42), + ) + study.optimize(objective, n_trials=n_trials, show_progress_bar=False) + print(f" Best AUC: {study.best_value:.4f} params: {study.best_params}") + return study.best_params + + +# ───────────────────────────────────────────────────────────────────────────── +# 5. SHAP FEATURE SELECTION +# ───────────────────────────────────────────────────────────────────────────── + + +def shap_feature_selection( + model, X_train: pd.DataFrame, threshold: float = 0.005 +) -> list: + """Use SHAP values to keep features with mean |SHAP| >= threshold.""" + print("\n [SHAP] Computing feature importance …") + try: + explainer = shap.TreeExplainer(model) + sample = X_train.sample(min(1000, len(X_train)), random_state=42) + shap_values = explainer.shap_values(sample) + if isinstance(shap_values, list): + shap_values = shap_values[1] + mean_abs = np.abs(shap_values).mean(axis=0) + importance_df = pd.DataFrame( + {"feature": X_train.columns, "shap_importance": mean_abs} + ).sort_values("shap_importance", ascending=False) + print(importance_df.head(15).to_string(index=False)) + selected = importance_df[importance_df["shap_importance"] >= threshold][ + "feature" + ].tolist() + print( + f" → {len(selected)}/{len(X_train.columns)} features selected (threshold={threshold})" + ) + return selected, importance_df + except Exception as e: + print(f" [WARN] SHAP failed: {e}. Using all features.") + return list(X_train.columns), None + + +# ───────────────────────────────────────────────────────────────────────────── +# 6. TRAIN INDIVIDUAL MODELS +# ───────────────────────────────────────────────────────────────────────────── + + +def train_xgboost(X_train, y_train, best_params: dict): + """Train final XGBoost on all training data.""" + scale_pos = float((len(y_train) - y_train.sum()) / max(y_train.sum(), 1)) + params = { + "objective": "binary:logistic", + "eval_metric": "auc", + "verbosity": 0, + "random_state": 42, + "scale_pos_weight": scale_pos, + **best_params, + } + model = xgb.XGBClassifier(**params) + model.fit(X_train, y_train) + return model + + +def train_lightgbm(X_train, y_train, best_params: dict): + """Train final LightGBM on all training data.""" + scale_pos = float((len(y_train) - y_train.sum()) / max(y_train.sum(), 1)) + params = { + "objective": "binary", + "metric": "auc", + "verbose": -1, + "random_state": 42, + "scale_pos_weight": scale_pos, + **best_params, + } + model = lgb.LGBMClassifier(**params) + model.fit(X_train, y_train) + return model + + +def train_mlp(X_train, y_train) -> Pipeline: + """Train MLP (3-layer) with StandardScaler pipeline.""" + pipeline = Pipeline( + [ + ("scaler", StandardScaler()), + ( + "mlp", + MLPClassifier( + hidden_layer_sizes=(256, 128, 64), + activation="relu", + solver="adam", + alpha=1e-3, + batch_size=128, + learning_rate="adaptive", + learning_rate_init=1e-3, + max_iter=200, + early_stopping=True, + validation_fraction=0.1, + n_iter_no_change=15, + random_state=42, + ), + ), + ] + ) + pipeline.fit(X_train, y_train) + return pipeline + + +# ───────────────────────────────────────────────────────────────────────────── +# 7. WEIGHTED VOTING ENSEMBLE +# ───────────────────────────────────────────────────────────────────────────── + + +class WeightedEnsemble: + """Soft-voting ensemble with per-model weights.""" + + def __init__(self, models: dict, weights: dict, feature_cols: list): + self.models = models # {name: model} + self.weights = weights # {name: float} + self.feature_cols = feature_cols + self.version = f"ensemble_v1_{datetime.now().strftime('%Y%m%d_%H%M%S')}" + + def predict_proba_all(self, X: pd.DataFrame): + """Return dict of model probabilities.""" + probas = {} + for name, model in self.models.items(): + try: + available = [c for c in self.feature_cols if c in X.columns] + probas[name] = model.predict_proba(X[available])[:, 1] + except Exception as e: + print(f" [WARN] {name} predict failed: {e}") + probas[name] = np.zeros(len(X)) + return probas + + def predict_proba(self, X: pd.DataFrame): + probas = self.predict_proba_all(X) + total_w = sum(self.weights.values()) + ensemble_proba = np.zeros(len(X)) + for name, proba in probas.items(): + w = self.weights.get(name, 1.0) / total_w + ensemble_proba += w * proba + return np.column_stack([1 - ensemble_proba, ensemble_proba]) + + def predict(self, X: pd.DataFrame, threshold: float = 0.5): + return (self.predict_proba(X)[:, 1] >= threshold).astype(int) + + +def compute_ensemble_weights(models: dict, X_val, y_val, feature_cols: list) -> dict: + """Compute weights proportional to AUC on validation set.""" + weights = {} + for name, model in models.items(): + try: + available = [c for c in feature_cols if c in X_val.columns] + prob = model.predict_proba(X_val[available])[:, 1] + auc = roc_auc_score(y_val, prob) + weights[name] = max(auc - 0.5, 0.01) # clamp positives + print(f" {name}: AUC={auc:.4f} weight={weights[name]:.4f}") + except Exception as e: + print(f" [WARN] {name} weight computation failed: {e}") + weights[name] = 0.01 + return weights + + +# ───────────────────────────────────────────────────────────────────────────── +# 8. EVALUATION HELPERS +# ───────────────────────────────────────────────────────────────────────────── + + +def evaluate_model( + model, X_holdout, y_holdout, holdout_df: pd.DataFrame, name: str +) -> dict: + """Full evaluation: AUC, Accuracy, Precision@3.""" + t0 = time.time() + available = [c for c in model.feature_cols if c in X_holdout.columns] + proba = model.predict_proba(X_holdout[available])[:, 1] + latency_ms = (time.time() - t0) / len(X_holdout) * 1000 + + auc = roc_auc_score(y_holdout, proba) + pred = (proba >= 0.5).astype(int) + acc = accuracy_score(y_holdout, pred) + prec = precision_score(y_holdout, pred, zero_division=0) + rec = recall_score(y_holdout, pred, zero_division=0) + p_at3 = compute_precision_at3(proba, y_holdout, holdout_df) + + return { + "model": name, + "auc": round(auc, 4), + "accuracy": round(acc, 4), + "precision": round(prec, 4), + "recall": round(rec, 4), + "precision_at3": round(p_at3, 4), + "latency_ms_per_row": round(latency_ms, 4), + } + + +# ───────────────────────────────────────────────────────────────────────────── +# 9. MAIN PIPELINE +# ───────────────────────────────────────────────────────────────────────────── + + +def main(args): + print("\n" + "=" * 65) + print("TURF ML UPGRADE — Ensemble + Optuna (Sprint 6-7)") + print("=" * 65) + + # ── Load & feature engineer ────────────────────────────────────────────── + print("\n[1/9] Loading data …") + df = load_data(args.db) + df = engineer_features(df) + + # ── Temporal split ──────────────────────────────────────────────────────── + print("\n[2/9] Temporal split (80/20) …") + train_df, holdout_df = temporal_split(df, holdout_frac=HOLDOUT_FRACTION) + + X_train, y_train, feat_cols = get_features_and_target(train_df, "top3") + X_holdout, y_holdout, _ = get_features_and_target(holdout_df, "top3") + + # Sub-split: 80 % actual train, 10 % Optuna val, 10 % weight cal + n = len(X_train) + n_val = int(n * 0.15) + X_tr = X_train.iloc[: n - n_val] + y_tr = y_train.iloc[: n - n_val] + X_val = X_train.iloc[n - n_val :] + y_val = y_train.iloc[n - n_val :] + val_df = train_df.iloc[n - n_val :] + + print(f" Train subset: {len(X_tr):,}") + print(f" Val subset: {len(X_val):,}") + print(f" Holdout: {len(X_holdout):,}") + + # ── Baseline evaluation ─────────────────────────────────────────────────── + print("\n[3/9] Evaluating baseline XGBoost …") + existing_path = "/home/h3r7/turf_saas/xgboost_models.pkl" + baseline = evaluate_baseline(holdout_df, existing_path) + print( + f" Baseline Precision@3: {baseline['precision_at3']:.4f} AUC: {baseline['auc']:.4f}" + ) + + # ── Optuna ──────────────────────────────────────────────────────────────── + n_trials = args.trials + + print(f"\n[4/9] Optuna optimization ({n_trials} trials each) …") + xgb_params = optuna_xgboost(X_tr, y_tr, n_trials=n_trials) + lgb_params = optuna_lightgbm(X_tr, y_tr, n_trials=n_trials) + + # ── Train individual models ─────────────────────────────────────────────── + print("\n[5/9] Training individual models …") + print(" Training optimized XGBoost …") + xgb_model = train_xgboost(X_tr, y_tr, xgb_params) + + print(" Training LightGBM …") + lgb_model = train_lightgbm(X_tr, y_tr, lgb_params) + + print(" Training MLP (3 layers) …") + mlp_model = train_mlp(X_tr.values, y_tr) + + # ── SHAP feature analysis ───────────────────────────────────────────────── + print("\n[6/9] SHAP feature importance (XGBoost) …") + selected_features, shap_df = shap_feature_selection(xgb_model, X_tr) + + # ── Compute weights ──────────────────────────────────────────────────────── + print("\n[7/9] Computing ensemble weights on validation …") + + # Wrap MLP to use feature_cols interface + class WrappedMLP: + def __init__(self, pipeline, cols): + self.pipeline = pipeline + self.feature_cols = cols + + def predict_proba(self, X): + available = [c for c in self.feature_cols if c in X.columns] + return self.pipeline.predict_proba(X[available].values) + + class WrappedTree: + def __init__(self, model, cols): + self.model = model + self.feature_cols = cols + + def predict_proba(self, X): + available = [c for c in self.feature_cols if c in X.columns] + return self.model.predict_proba(X[available]) + + wrapped_xgb = WrappedTree(xgb_model, feat_cols) + wrapped_lgb = WrappedTree(lgb_model, feat_cols) + wrapped_mlp = WrappedMLP(mlp_model, feat_cols) + + model_dict = { + "xgboost": wrapped_xgb, + "lightgbm": wrapped_lgb, + "mlp": wrapped_mlp, + } + + weights = compute_ensemble_weights(model_dict, X_val, y_val, feat_cols) + + # ── Build ensemble ───────────────────────────────────────────────────────── + print("\n[8/9] Building WeightedEnsemble …") + + class FullEnsemble: + """Picklable ensemble wrapper.""" + + def __init__(self, xgb_m, lgb_m, mlp_pipe, weights, feature_cols): + self.xgb_model = xgb_m + self.lgb_model = lgb_m + self.mlp_pipeline = mlp_pipe + self.weights = weights + self.feature_cols = feature_cols + self.version = f"ensemble_v1_{datetime.now().strftime('%Y%m%d_%H%M%S')}" + + def predict_proba(self, X: pd.DataFrame): + if isinstance(X, np.ndarray): + X = pd.DataFrame(X, columns=self.feature_cols) + available = [c for c in self.feature_cols if c in X.columns] + Xa = X[available].fillna(0) + + total_w = sum(self.weights.values()) + proba = np.zeros(len(Xa)) + + # XGBoost + xp = self.xgb_model.predict_proba(Xa)[:, 1] + proba += (self.weights.get("xgboost", 0.33) / total_w) * xp + + # LightGBM + lp = self.lgb_model.predict_proba(Xa)[:, 1] + proba += (self.weights.get("lightgbm", 0.33) / total_w) * lp + + # MLP + mp = self.mlp_pipeline.predict_proba(Xa.values)[:, 1] + proba += (self.weights.get("mlp", 0.33) / total_w) * mp + + return np.column_stack([1 - proba, proba]) + + def predict(self, X, threshold=0.5): + return (self.predict_proba(X)[:, 1] >= threshold).astype(int) + + ensemble = FullEnsemble(xgb_model, lgb_model, mlp_model, weights, feat_cols) + # Add feature_cols attribute for evaluate_model + ensemble_eval = type( + "E", + (), + { + "predict_proba": ensemble.predict_proba, + "feature_cols": feat_cols, + }, + )() + + # ── Holdout evaluation ───────────────────────────────────────────────────── + print("\n[9/9] Evaluating all models on holdout …") + results = {} + + # Individual models + for name, wrapped in model_dict.items(): + res = evaluate_model(wrapped, X_holdout, y_holdout, holdout_df, name) + results[name] = res + print( + f" {name:12s} Precision@3={res['precision_at3']:.4f} AUC={res['auc']:.4f}" + ) + + # Ensemble + ens_res = evaluate_model( + ensemble_eval, X_holdout, y_holdout, holdout_df, "ensemble" + ) + results["ensemble"] = ens_res + print( + f" {'ensemble':12s} Precision@3={ens_res['precision_at3']:.4f} AUC={ens_res['auc']:.4f}" + ) + + # Baseline comparison + baseline_p3 = baseline["precision_at3"] + ensemble_p3 = ens_res["precision_at3"] + delta = ensemble_p3 - baseline_p3 + + print("\n" + "=" * 65) + print("BENCHMARK SUMMARY") + print("=" * 65) + print(f" Baseline XGBoost Precision@3: {baseline_p3:.4f}") + print(f" Optimized Ensemble Precision@3: {ensemble_p3:.4f}") + print(f" Delta: {delta:+.4f} ({delta * 100:+.1f}%)") + deploy = delta >= DEPLOY_THRESHOLD + print( + f" Deploy threshold (+{DEPLOY_THRESHOLD * 100:.0f}%): {'✅ DEPLOY' if deploy else '❌ BELOW THRESHOLD'}" + ) + print("=" * 65) + + # ── Save models ──────────────────────────────────────────────────────────── + ensemble_path = MODELS_DIR / "ensemble_top3.pkl" + with open(ensemble_path, "wb") as f: + pickle.dump(ensemble, f) + print(f"\n ✅ Ensemble saved → {ensemble_path}") + + # Also save individual optimized models + for name, model in [("xgboost_optimized", xgb_model), ("lightgbm", lgb_model)]: + model_path = MODELS_DIR / f"{name}_top3.pkl" + with open(model_path, "wb") as f: + pickle.dump( + { + "model": model, + "feature_cols": feat_cols, + "params": xgb_params if name.startswith("xgb") else lgb_params, + }, + f, + ) + print(f" ✅ {name} saved → {model_path}") + + mlp_path = MODELS_DIR / "mlp_top3.pkl" + with open(mlp_path, "wb") as f: + pickle.dump({"pipeline": mlp_model, "feature_cols": feat_cols}, f) + print(f" ✅ MLP saved → {mlp_path}") + + # ── Save benchmark report ────────────────────────────────────────────────── + report = { + "run_date": datetime.now().isoformat(), + "dataset": { + "db_path": args.db, + "total_rows": len(df), + "train_rows": len(X_train), + "holdout_rows": len(X_holdout), + "train_date_range": [ + str(train_df["date_programme"].min()), + str(train_df["date_programme"].max()), + ], + "holdout_date_range": [ + str(holdout_df["date_programme"].min()), + str(holdout_df["date_programme"].max()), + ], + }, + "baseline": baseline, + "individual_models": {k: v for k, v in results.items() if k != "ensemble"}, + "ensemble": ens_res, + "delta_precision_at3": round(delta, 4), + "deploy": deploy, + "optuna": { + "n_trials": n_trials, + "xgboost_best_params": xgb_params, + "lightgbm_best_params": lgb_params, + }, + "features": { + "total": len(feat_cols), + "selected_by_shap": len(selected_features), + "feature_list": feat_cols, + "shap_selected": selected_features, + }, + "ensemble_weights": weights, + } + + report_path = MODELS_DIR / "benchmark_report.json" + with open(report_path, "w") as f: + json.dump(report, f, indent=2) + print(f" ✅ Benchmark report → {report_path}") + + # Human-readable markdown + md_path = MODELS_DIR / "benchmark_report.md" + _write_markdown_report(report, md_path) + print(f" ✅ Markdown report → {md_path}") + + return report + + +def _write_markdown_report(report: dict, path: Path): + """Write a human-readable markdown benchmark report.""" + b = report["baseline"] + e = report["ensemble"] + delta = report["delta_precision_at3"] + deploy_str = ( + "✅ DEPLOIEMENT RECOMMANDE" + if report["deploy"] + else "❌ EN DESSOUS DU SEUIL (+5%)" + ) + + lines = [ + f"# Benchmark ML Ensemble — Turf Prédictions", + f"", + f"**Date:** {report['run_date'][:10]} ", + f"**Dataset:** {report['dataset']['total_rows']:,} partants ", + f"**Holdout:** {report['dataset']['holdout_rows']:,} lignes ({report['dataset']['holdout_date_range'][0]} → {report['dataset']['holdout_date_range'][1]})", + f"", + f"## Résultats", + f"", + f"| Modèle | Precision@3 | AUC | Latence/prédiction |", + f"|--------|-------------|-----|-------------------|", + f"| XGBoost (baseline) | {b['precision_at3']:.4f} | {b['auc']:.4f} | — |", + ] + for name, res in report["individual_models"].items(): + lines.append( + f"| {name} | {res['precision_at3']:.4f} | {res['auc']:.4f} | {res['latency_ms_per_row']:.2f} ms |" + ) + lines += [ + f"| **Ensemble** | **{e['precision_at3']:.4f}** | **{e['auc']:.4f}** | **{e['latency_ms_per_row']:.2f} ms** |", + f"", + f"## Décision de déploiement", + f"", + f"- Delta Precision@3 : **{delta:+.4f}** ({delta * 100:+.1f}%)", + f"- Seuil requis : **+5%**", + f"- Résultat : **{deploy_str}**", + f"", + f"## Optimisation Optuna", + f"", + f"- Trials XGBoost : {report['optuna']['n_trials']}", + f"- Trials LightGBM : {report['optuna']['n_trials']}", + f"- Pruning : MedianPruner", + f"", + f"### Meilleurs hyperparamètres XGBoost", + f"```json", + json.dumps(report["optuna"]["xgboost_best_params"], indent=2), + f"```", + f"", + f"### Meilleurs hyperparamètres LightGBM", + f"```json", + json.dumps(report["optuna"]["lightgbm_best_params"], indent=2), + f"```", + f"", + f"## Features", + f"", + f"- Total features : {report['features']['total']}", + f"- Retenues par SHAP : {report['features']['selected_by_shap']}", + f"", + f"## Poids de l'ensemble", + f"", + ] + for name, w in report["ensemble_weights"].items(): + lines.append(f"- {name} : {w:.4f}") + + path.write_text("\n".join(lines)) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Train ensemble ML model for turf predictions" + ) + parser.add_argument("--db", default=DB_PATH, help="Path to SQLite database") + parser.add_argument( + "--trials", type=int, default=MIN_TRIALS, help="Optuna trials per model" + ) + parser.add_argument( + "--quick", action="store_true", help="Quick mode: 10 trials only" + ) + args = parser.parse_args() + + if args.quick: + args.trials = 10 + + report = main(args) + print(f"\nDone. Deploy={report['deploy']}")