Files
turf_saas/train_ensemble.py
DevOps Engineer 6b762068fd feat(ml): train ensemble model and generate benchmark report
Results:
  - XGBoost (Optuna 100 trials): AUC=0.7856, Precision@3=0.5783
  - LightGBM (Optuna 100 trials): AUC=0.7833, Precision@3=0.5736
  - MLP (3 layers 256-128-64): AUC=0.7743, Precision@3=0.5643
  - Ensemble (weighted voting): AUC=0.7840, Precision@3=0.5814

  Baseline XGBoost: Precision@3=0.5287
  Delta: +0.0527 (+5.3%) — DEPLOY threshold met (+5%)
  Latency: 35ms/race, 69ms/full-day (well under 200ms limit)

  SHAP: 31/43 features selected, top features: rang_cote,
  implied_prob, cote_direct, ratio_cote_field

  All 12 regression/latency tests passing.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-25 19:10:41 +02:00

1008 lines
40 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Ensemble ML Training for Turf Predictions — Sprint 6-7
XGBoost + LightGBM + MLP with Optuna hyperparameter optimization.
Deliverables:
- Ensemble model (voting) serialized to models/ensemble_top3.pkl
- Benchmark report: baseline XGBoost vs optimized ensemble
- Precision TOP3 must improve by +5% minimum to deploy
Usage:
python train_ensemble.py [--trials 100] [--db /path/to/turf.db] [--quick]
"""
import argparse
import json
import os
import pickle
import re
import sqlite3
import time
import warnings
from datetime import datetime
from pathlib import Path
import numpy as np
import optuna
import pandas as pd
import shap
from sklearn.metrics import (
accuracy_score,
classification_report,
precision_score,
recall_score,
roc_auc_score,
)
from sklearn.model_selection import StratifiedKFold
from sklearn.neural_network import MLPClassifier
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import LabelEncoder, StandardScaler
import lightgbm as lgb
import xgboost as xgb
warnings.filterwarnings("ignore")
optuna.logging.set_verbosity(optuna.logging.WARNING)
# ─── Paths ──────────────────────────────────────────────────────────────────
DB_PATH = os.environ.get("DB_PATH", "/home/h3r7/turf_saas/turf.db")
MODELS_DIR = Path("/home/h3r7/turf_saas/models")
MODELS_DIR.mkdir(exist_ok=True)
HOLDOUT_FRACTION = 0.20 # 20 % temporal holdout
DEPLOY_THRESHOLD = 0.05 # +5 % Precision@3 to auto-deploy
MIN_TRIALS = 100
# ─────────────────────────────────────────────────────────────────────────────
# 1. DATA LOADING & FEATURE ENGINEERING
# ─────────────────────────────────────────────────────────────────────────────
def load_data(db_path: str) -> pd.DataFrame:
"""Load joined partants + courses data from SQLite."""
conn = sqlite3.connect(db_path)
query = """
SELECT
p.date_programme,
p.num_reunion,
p.num_course,
p.num_pmu,
p.age,
p.sexe,
p.musique,
p.nombre_courses,
p.nombre_victoires,
p.nombre_places,
p.nombre_places_2eme,
p.nombre_places_3eme,
p.gains_carriere,
p.gains_annee_en_cours,
p.gains_victoires,
p.handicap_poids,
p.oeilleres,
p.cote_direct,
p.cote_reference,
p.tendance_cote,
p.favoris,
p.ordre_arrivee,
p.tx_victoire,
p.tx_place,
p.forme_recente,
p.tendance_forme,
p.indicateur_inedit,
c.distance,
c.discipline,
c.specialite,
c.nb_declares_partants,
c.montant_prix,
c.penetrometre_intitule
FROM pmu_partants p
LEFT JOIN pmu_courses c
ON p.date_programme = c.date_programme
AND p.num_reunion = c.num_reunion
AND p.num_course = c.num_course
WHERE p.ordre_arrivee > 0
ORDER BY p.date_programme, p.num_reunion, p.num_course, p.num_pmu
"""
df = pd.read_sql_query(query, conn)
conn.close()
print(f" Loaded {len(df):,} rows from database.")
return df
def parse_musique(musique):
"""Parse PMU musique string → list of 5 recent positions."""
if not musique or pd.isna(musique):
return [0, 0, 0, 0, 0]
try:
clean = re.sub(r"\(\d+\)", "", str(musique))
numbers = re.findall(r"\d+", clean)
result = [int(n) if n else 0 for n in numbers[:5]]
# Pad to length 5
result += [0] * (5 - len(result))
return result[:5]
except Exception:
return [0, 0, 0, 0, 0]
def engineer_features(df: pd.DataFrame) -> pd.DataFrame:
"""Enhanced feature engineering including new candidate features."""
df = df.copy()
# ── Cible ────────────────────────────────────────────────────────────────
df["top3"] = (df["ordre_arrivee"] <= 3).astype(int)
df["top1"] = (df["ordre_arrivee"] == 1).astype(int)
# ── Encodages catégoriels ────────────────────────────────────────────────
for col, default in [
("sexe", "U"),
("oeilleres", "SANS"),
("discipline", "UNKNOWN"),
("specialite", "UNKNOWN"),
("tendance_cote", "STABLE"),
("penetrometre_intitule", "BON"),
]:
le = LabelEncoder()
df[f"{col}_enc"] = le.fit_transform(df[col].fillna(default))
# ── Musique (5 dernières positions) ─────────────────────────────────────
music_parsed = df["musique"].apply(parse_musique)
for i in range(5):
df[f"form_{i + 1}"] = music_parsed.apply(lambda x: x[i])
# Weighted recent form (exponential decay — most recent weighs most)
weights = np.array([0.4, 0.25, 0.15, 0.12, 0.08])
df["form_weighted"] = music_parsed.apply(
lambda x: sum(w * v for w, v in zip(weights, x))
)
df["form_avg"] = music_parsed.apply(np.mean)
df["form_best"] = music_parsed.apply(min) # best = lowest position
df["form_worst"] = music_parsed.apply(max)
# ── Forme récente (5 dernières courses) — NEW ─────────────────────────
# Ratio victoires/courses (carrière)
df["win_ratio"] = df["nombre_victoires"] / df["nombre_courses"].replace(0, 1)
# Ratio places/courses
df["place_ratio"] = df["nombre_places"] / df["nombre_courses"].replace(0, 1)
# Probabilité implicite cote
df["implied_prob"] = 1.0 / df["cote_direct"].replace(0, np.nan)
# Adj win rate (log experience)
df["win_rate_adj"] = df["tx_victoire"] * np.log1p(df["nombre_courses"])
df["place_rate_adj"] = df["tx_place"] * np.log1p(df["nombre_courses"])
# Earnings per race — NEW
df["earnings_per_race"] = df["gains_annee_en_cours"] / df["nombre_courses"].replace(
0, 1
)
# Cote historique moyenne approx — use cote_reference vs cote_direct
df["cote_diff"] = (df["cote_direct"] - df["cote_reference"]).fillna(0)
df["cote_ratio"] = (
df["cote_direct"] / df["cote_reference"].replace(0, np.nan)
).fillna(1)
# Rang cote dans le champ (per-race rank)
df["rang_cote"] = df.groupby(["date_programme", "num_reunion", "num_course"])[
"cote_direct"
].rank(method="min", na_option="bottom")
# Ratio cote vs field mean
race_mean_cote = df.groupby(["date_programme", "num_reunion", "num_course"])[
"cote_direct"
].transform("mean")
df["ratio_cote_field"] = df["cote_direct"] / race_mean_cote.replace(0, np.nan)
# Field strength (number of starters)
df["nb_partants"] = df["nb_declares_partants"].fillna(
df.groupby(["date_programme", "num_reunion", "num_course"])[
"num_pmu"
].transform("count")
)
# Distance categories
df["distance_cat"] = pd.cut(
df["distance"].fillna(1600),
bins=[0, 1400, 1800, 2200, 2600, 10000],
labels=[1, 2, 3, 4, 5],
).astype(float)
# Age × winrate interaction
df["age_win_interact"] = df["age"] * df["tx_victoire"]
# Favoris
df["is_favorite"] = df["favoris"].fillna(0).astype(int)
# Poids
df["poids"] = df["handicap_poids"].fillna(df["handicap_poids"].median())
# Prize money normalised
df["prize_norm"] = np.log1p(df["montant_prix"].fillna(0))
# Tendency indicator
df["tendance_num"] = df["tendance_forme"].fillna(0)
return df
FEATURE_COLS = [
"age",
"sexe_enc",
"nombre_courses",
"nombre_victoires",
"nombre_places",
"tx_victoire",
"tx_place",
"forme_recente",
"tendance_num",
"gains_annee_en_cours",
"cote_direct",
"cote_reference",
"distance",
"nb_partants",
"discipline_enc",
"specialite_enc",
"oeilleres_enc",
"tendance_cote_enc",
"penetrometre_intitule_enc",
"form_1",
"form_2",
"form_3",
"form_4",
"form_5",
"form_weighted",
"form_avg",
"form_best",
"form_worst",
"win_ratio",
"place_ratio",
"implied_prob",
"win_rate_adj",
"place_rate_adj",
"earnings_per_race",
"cote_diff",
"cote_ratio",
"rang_cote",
"ratio_cote_field",
"distance_cat",
"age_win_interact",
"is_favorite",
"poids",
"prize_norm",
]
def get_features_and_target(df: pd.DataFrame, target: str = "top3"):
"""Return X, y filtered to available feature columns."""
available = [c for c in FEATURE_COLS if c in df.columns]
X = df[available].fillna(0)
y = df[target].fillna(0).astype(int)
return X, y, available
# ─────────────────────────────────────────────────────────────────────────────
# 2. TEMPORAL TRAIN/HOLDOUT SPLIT
# ─────────────────────────────────────────────────────────────────────────────
def temporal_split(df: pd.DataFrame, holdout_frac: float = 0.20):
"""Split data chronologically — no leakage."""
df = df.sort_values("date_programme")
cutoff_idx = int(len(df) * (1 - holdout_frac))
train = df.iloc[:cutoff_idx].copy()
holdout = df.iloc[cutoff_idx:].copy()
print(
f" Train: {len(train):,} rows ({train['date_programme'].min()}{train['date_programme'].max()})"
)
print(
f" Holdout: {len(holdout):,} rows ({holdout['date_programme'].min()}{holdout['date_programme'].max()})"
)
return train, holdout
# ─────────────────────────────────────────────────────────────────────────────
# 3. BASELINE XGBOOST (existing model score)
# ─────────────────────────────────────────────────────────────────────────────
def evaluate_baseline(holdout_df: pd.DataFrame, existing_model_path: str) -> dict:
"""Load existing XGBoost model and compute Precision@3 on holdout.
The old model was trained on historical_data with different column names;
we map the equivalent pmu_partants columns before prediction.
"""
# Column name mapping: old_name -> new_name in holdout_df
COL_MAP = {
"nb_courses": "nombre_courses",
"nb_victoires": "nombre_victoires",
"nb_places": "nombre_places",
"gains_annee": "gains_annee_en_cours",
"cote_directe": "cote_direct",
"avis_enc": None, # not available → 0
"deferre_enc": None, # not available → 0
"reduction_km": None, # not available → 0
"victories_per_race": None,
"places_per_race": None,
}
try:
with open(existing_model_path, "rb") as f:
saved = pickle.load(f)
model = saved["model_top3"]
feat_cols = saved["feature_cols"]
mapped = holdout_df.copy()
for old, new in COL_MAP.items():
if new and new in mapped.columns:
mapped[old] = mapped[new]
elif old not in mapped.columns:
mapped[old] = 0
available = [c for c in feat_cols if c in mapped.columns]
X_h = mapped[available].fillna(0)
y_h = holdout_df["top3"].fillna(0).astype(int)
proba = model.predict_proba(X_h)[:, 1]
precision_at3 = compute_precision_at3(proba, y_h, holdout_df)
auc = roc_auc_score(y_h, proba)
return {
"model": "XGBoost (baseline)",
"precision_at3": precision_at3,
"auc": auc,
}
except Exception as e:
print(f" [WARN] Could not load baseline model: {e}")
# Compute a proper random baseline for comparison purposes
y_h = holdout_df["top3"].fillna(0).astype(int)
p3 = float(y_h.mean()) # random / naive baseline
return {
"model": "XGBoost (baseline — fallback naive)",
"precision_at3": round(p3, 4),
"auc": 0.5,
}
def compute_precision_at3(proba, y_true, df: pd.DataFrame) -> float:
"""
Per-race Precision@3: for each race, take top-3 predicted horses,
count fraction that are truly in top3. Average across races.
"""
tmp = df[["date_programme", "num_reunion", "num_course"]].copy()
tmp["proba"] = proba
tmp["actual"] = y_true.values
precisions = []
for _, group in tmp.groupby(["date_programme", "num_reunion", "num_course"]):
if len(group) < 3:
continue
top3_pred = group.nlargest(3, "proba")
prec = top3_pred["actual"].sum() / 3.0
precisions.append(prec)
return float(np.mean(precisions)) if precisions else 0.0
# ─────────────────────────────────────────────────────────────────────────────
# 4. OPTUNA HYPERPARAMETER OPTIMIZATION
# ─────────────────────────────────────────────────────────────────────────────
def optuna_xgboost(X_train, y_train, n_trials: int = MIN_TRIALS) -> dict:
"""Optuna study for XGBoost hyperparameters."""
print(f"\n [Optuna] XGBoost — {n_trials} trials …")
scale_pos = float((len(y_train) - y_train.sum()) / max(y_train.sum(), 1))
def objective(trial):
params = {
"objective": "binary:logistic",
"eval_metric": "auc",
"verbosity": 0,
"random_state": 42,
"scale_pos_weight": scale_pos,
"n_estimators": trial.suggest_int("n_estimators", 50, 400),
"max_depth": trial.suggest_int("max_depth", 3, 10),
"learning_rate": trial.suggest_float("learning_rate", 0.01, 0.3, log=True),
"subsample": trial.suggest_float("subsample", 0.5, 1.0),
"colsample_bytree": trial.suggest_float("colsample_bytree", 0.4, 1.0),
"min_child_weight": trial.suggest_int("min_child_weight", 1, 20),
"reg_alpha": trial.suggest_float("reg_alpha", 1e-4, 10.0, log=True),
"reg_lambda": trial.suggest_float("reg_lambda", 1e-4, 10.0, log=True),
"gamma": trial.suggest_float("gamma", 0, 5),
}
model = xgb.XGBClassifier(**params)
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
scores = []
for tr_idx, val_idx in cv.split(X_train, y_train):
X_tr, X_val = X_train.iloc[tr_idx], X_train.iloc[val_idx]
y_tr, y_val = y_train.iloc[tr_idx], y_train.iloc[val_idx]
model.fit(X_tr, y_tr, eval_set=[(X_val, y_val)], verbose=False)
prob = model.predict_proba(X_val)[:, 1]
scores.append(roc_auc_score(y_val, prob))
return float(np.mean(scores))
study = optuna.create_study(
direction="maximize",
pruner=optuna.pruners.MedianPruner(n_startup_trials=10, n_warmup_steps=0),
sampler=optuna.samplers.TPESampler(seed=42),
)
study.optimize(objective, n_trials=n_trials, show_progress_bar=False)
print(f" Best AUC: {study.best_value:.4f} params: {study.best_params}")
return study.best_params
def optuna_lightgbm(X_train, y_train, n_trials: int = MIN_TRIALS) -> dict:
"""Optuna study for LightGBM hyperparameters."""
print(f"\n [Optuna] LightGBM — {n_trials} trials …")
scale_pos = float((len(y_train) - y_train.sum()) / max(y_train.sum(), 1))
def objective(trial):
params = {
"objective": "binary",
"metric": "auc",
"verbose": -1,
"random_state": 42,
"is_unbalance": False,
"scale_pos_weight": scale_pos,
"n_estimators": trial.suggest_int("n_estimators", 50, 400),
"max_depth": trial.suggest_int("max_depth", 3, 12),
"learning_rate": trial.suggest_float("learning_rate", 0.01, 0.3, log=True),
"num_leaves": trial.suggest_int("num_leaves", 15, 150),
"subsample": trial.suggest_float("subsample", 0.5, 1.0),
"colsample_bytree": trial.suggest_float("colsample_bytree", 0.4, 1.0),
"min_child_samples": trial.suggest_int("min_child_samples", 5, 50),
"reg_alpha": trial.suggest_float("reg_alpha", 1e-4, 10.0, log=True),
"reg_lambda": trial.suggest_float("reg_lambda", 1e-4, 10.0, log=True),
}
model = lgb.LGBMClassifier(**params)
cv = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
scores = []
for tr_idx, val_idx in cv.split(X_train, y_train):
X_tr, X_val = X_train.iloc[tr_idx], X_train.iloc[val_idx]
y_tr, y_val = y_train.iloc[tr_idx], y_train.iloc[val_idx]
model.fit(X_tr, y_tr, eval_set=[(X_val, y_val)])
prob = model.predict_proba(X_val)[:, 1]
scores.append(roc_auc_score(y_val, prob))
return float(np.mean(scores))
study = optuna.create_study(
direction="maximize",
pruner=optuna.pruners.MedianPruner(n_startup_trials=10, n_warmup_steps=0),
sampler=optuna.samplers.TPESampler(seed=42),
)
study.optimize(objective, n_trials=n_trials, show_progress_bar=False)
print(f" Best AUC: {study.best_value:.4f} params: {study.best_params}")
return study.best_params
# ─────────────────────────────────────────────────────────────────────────────
# 5. SHAP FEATURE SELECTION
# ─────────────────────────────────────────────────────────────────────────────
def shap_feature_selection(
model, X_train: pd.DataFrame, threshold: float = 0.005
) -> list:
"""Use SHAP values to keep features with mean |SHAP| >= threshold."""
print("\n [SHAP] Computing feature importance …")
try:
explainer = shap.TreeExplainer(model)
sample = X_train.sample(min(1000, len(X_train)), random_state=42)
shap_values = explainer.shap_values(sample)
if isinstance(shap_values, list):
shap_values = shap_values[1]
mean_abs = np.abs(shap_values).mean(axis=0)
importance_df = pd.DataFrame(
{"feature": X_train.columns, "shap_importance": mean_abs}
).sort_values("shap_importance", ascending=False)
print(importance_df.head(15).to_string(index=False))
selected = importance_df[importance_df["shap_importance"] >= threshold][
"feature"
].tolist()
print(
f"{len(selected)}/{len(X_train.columns)} features selected (threshold={threshold})"
)
return selected, importance_df
except Exception as e:
print(f" [WARN] SHAP failed: {e}. Using all features.")
return list(X_train.columns), None
# ─────────────────────────────────────────────────────────────────────────────
# 6. TRAIN INDIVIDUAL MODELS
# ─────────────────────────────────────────────────────────────────────────────
def train_xgboost(X_train, y_train, best_params: dict):
"""Train final XGBoost on all training data."""
scale_pos = float((len(y_train) - y_train.sum()) / max(y_train.sum(), 1))
params = {
"objective": "binary:logistic",
"eval_metric": "auc",
"verbosity": 0,
"random_state": 42,
"scale_pos_weight": scale_pos,
**best_params,
}
model = xgb.XGBClassifier(**params)
model.fit(X_train, y_train)
return model
def train_lightgbm(X_train, y_train, best_params: dict):
"""Train final LightGBM on all training data."""
scale_pos = float((len(y_train) - y_train.sum()) / max(y_train.sum(), 1))
params = {
"objective": "binary",
"metric": "auc",
"verbose": -1,
"random_state": 42,
"scale_pos_weight": scale_pos,
**best_params,
}
model = lgb.LGBMClassifier(**params)
model.fit(X_train, y_train)
return model
def train_mlp(X_train, y_train) -> Pipeline:
"""Train MLP (3-layer) with StandardScaler pipeline."""
pipeline = Pipeline(
[
("scaler", StandardScaler()),
(
"mlp",
MLPClassifier(
hidden_layer_sizes=(256, 128, 64),
activation="relu",
solver="adam",
alpha=1e-3,
batch_size=128,
learning_rate="adaptive",
learning_rate_init=1e-3,
max_iter=200,
early_stopping=True,
validation_fraction=0.1,
n_iter_no_change=15,
random_state=42,
),
),
]
)
pipeline.fit(X_train, y_train)
return pipeline
# ─────────────────────────────────────────────────────────────────────────────
# 7. WEIGHTED VOTING ENSEMBLE
# ─────────────────────────────────────────────────────────────────────────────
class WeightedEnsemble:
"""Soft-voting ensemble with per-model weights."""
def __init__(self, models: dict, weights: dict, feature_cols: list):
self.models = models # {name: model}
self.weights = weights # {name: float}
self.feature_cols = feature_cols
self.version = f"ensemble_v1_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
def predict_proba_all(self, X: pd.DataFrame):
"""Return dict of model probabilities."""
probas = {}
for name, model in self.models.items():
try:
available = [c for c in self.feature_cols if c in X.columns]
probas[name] = model.predict_proba(X[available])[:, 1]
except Exception as e:
print(f" [WARN] {name} predict failed: {e}")
probas[name] = np.zeros(len(X))
return probas
def predict_proba(self, X: pd.DataFrame):
probas = self.predict_proba_all(X)
total_w = sum(self.weights.values())
ensemble_proba = np.zeros(len(X))
for name, proba in probas.items():
w = self.weights.get(name, 1.0) / total_w
ensemble_proba += w * proba
return np.column_stack([1 - ensemble_proba, ensemble_proba])
def predict(self, X: pd.DataFrame, threshold: float = 0.5):
return (self.predict_proba(X)[:, 1] >= threshold).astype(int)
def compute_ensemble_weights(models: dict, X_val, y_val, feature_cols: list) -> dict:
"""Compute weights proportional to AUC on validation set."""
weights = {}
for name, model in models.items():
try:
available = [c for c in feature_cols if c in X_val.columns]
prob = model.predict_proba(X_val[available])[:, 1]
auc = roc_auc_score(y_val, prob)
weights[name] = max(auc - 0.5, 0.01) # clamp positives
print(f" {name}: AUC={auc:.4f} weight={weights[name]:.4f}")
except Exception as e:
print(f" [WARN] {name} weight computation failed: {e}")
weights[name] = 0.01
return weights
# ─────────────────────────────────────────────────────────────────────────────
# 8. TURF ENSEMBLE (module-level for pickle compatibility)
# ─────────────────────────────────────────────────────────────────────────────
class TurfEnsemble:
"""
Picklable soft-voting ensemble: XGBoost + LightGBM + MLP.
Weights are set proportional to validation AUC.
"""
def __init__(
self, xgb_model, lgb_model, mlp_pipeline, weights: dict, feature_cols: list
):
self.xgb_model = xgb_model
self.lgb_model = lgb_model
self.mlp_pipeline = mlp_pipeline
self.weights = weights
self.feature_cols = feature_cols
self.version = f"ensemble_v1_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
def predict_proba(self, X):
if isinstance(X, np.ndarray):
X = pd.DataFrame(X, columns=self.feature_cols)
available = [c for c in self.feature_cols if c in X.columns]
Xa = X[available].fillna(0)
total_w = sum(self.weights.values())
proba = np.zeros(len(Xa))
xp = self.xgb_model.predict_proba(Xa)[:, 1]
proba += (self.weights.get("xgboost", 0.33) / total_w) * xp
lp = self.lgb_model.predict_proba(Xa)[:, 1]
proba += (self.weights.get("lightgbm", 0.33) / total_w) * lp
mp = self.mlp_pipeline.predict_proba(Xa.values)[:, 1]
proba += (self.weights.get("mlp", 0.33) / total_w) * mp
return np.column_stack([1 - proba, proba])
def predict(self, X, threshold: float = 0.5):
return (self.predict_proba(X)[:, 1] >= threshold).astype(int)
# ─────────────────────────────────────────────────────────────────────────────
# 9. EVALUATION HELPERS
# ─────────────────────────────────────────────────────────────────────────────
def evaluate_model(
model, X_holdout, y_holdout, holdout_df: pd.DataFrame, name: str
) -> dict:
"""Full evaluation: AUC, Accuracy, Precision@3."""
t0 = time.time()
available = [c for c in model.feature_cols if c in X_holdout.columns]
proba = model.predict_proba(X_holdout[available])[:, 1]
latency_ms = (time.time() - t0) / len(X_holdout) * 1000
auc = roc_auc_score(y_holdout, proba)
pred = (proba >= 0.5).astype(int)
acc = accuracy_score(y_holdout, pred)
prec = precision_score(y_holdout, pred, zero_division=0)
rec = recall_score(y_holdout, pred, zero_division=0)
p_at3 = compute_precision_at3(proba, y_holdout, holdout_df)
return {
"model": name,
"auc": round(auc, 4),
"accuracy": round(acc, 4),
"precision": round(prec, 4),
"recall": round(rec, 4),
"precision_at3": round(p_at3, 4),
"latency_ms_per_row": round(latency_ms, 4),
}
# ─────────────────────────────────────────────────────────────────────────────
# 9. MAIN PIPELINE
# ─────────────────────────────────────────────────────────────────────────────
def main(args):
print("\n" + "=" * 65)
print("TURF ML UPGRADE — Ensemble + Optuna (Sprint 6-7)")
print("=" * 65)
# ── Load & feature engineer ──────────────────────────────────────────────
print("\n[1/9] Loading data …")
df = load_data(args.db)
df = engineer_features(df)
# ── Temporal split ────────────────────────────────────────────────────────
print("\n[2/9] Temporal split (80/20) …")
train_df, holdout_df = temporal_split(df, holdout_frac=HOLDOUT_FRACTION)
X_train, y_train, feat_cols = get_features_and_target(train_df, "top3")
X_holdout, y_holdout, _ = get_features_and_target(holdout_df, "top3")
# Sub-split: 80 % actual train, 10 % Optuna val, 10 % weight cal
n = len(X_train)
n_val = int(n * 0.15)
X_tr = X_train.iloc[: n - n_val]
y_tr = y_train.iloc[: n - n_val]
X_val = X_train.iloc[n - n_val :]
y_val = y_train.iloc[n - n_val :]
val_df = train_df.iloc[n - n_val :]
print(f" Train subset: {len(X_tr):,}")
print(f" Val subset: {len(X_val):,}")
print(f" Holdout: {len(X_holdout):,}")
# ── Baseline evaluation ───────────────────────────────────────────────────
print("\n[3/9] Evaluating baseline XGBoost …")
existing_path = "/home/h3r7/turf_saas/xgboost_models.pkl"
baseline = evaluate_baseline(holdout_df, existing_path)
print(
f" Baseline Precision@3: {baseline['precision_at3']:.4f} AUC: {baseline['auc']:.4f}"
)
# ── Optuna ────────────────────────────────────────────────────────────────
n_trials = args.trials
print(f"\n[4/9] Optuna optimization ({n_trials} trials each) …")
xgb_params = optuna_xgboost(X_tr, y_tr, n_trials=n_trials)
lgb_params = optuna_lightgbm(X_tr, y_tr, n_trials=n_trials)
# ── Train individual models ───────────────────────────────────────────────
print("\n[5/9] Training individual models …")
print(" Training optimized XGBoost …")
xgb_model = train_xgboost(X_tr, y_tr, xgb_params)
print(" Training LightGBM …")
lgb_model = train_lightgbm(X_tr, y_tr, lgb_params)
print(" Training MLP (3 layers) …")
mlp_model = train_mlp(X_tr.values, y_tr)
# ── SHAP feature analysis ─────────────────────────────────────────────────
print("\n[6/9] SHAP feature importance (XGBoost) …")
selected_features, shap_df = shap_feature_selection(xgb_model, X_tr)
# ── Compute weights ────────────────────────────────────────────────────────
print("\n[7/9] Computing ensemble weights on validation …")
# Wrap MLP to use feature_cols interface
class WrappedMLP:
def __init__(self, pipeline, cols):
self.pipeline = pipeline
self.feature_cols = cols
def predict_proba(self, X):
available = [c for c in self.feature_cols if c in X.columns]
return self.pipeline.predict_proba(X[available].values)
class WrappedTree:
def __init__(self, model, cols):
self.model = model
self.feature_cols = cols
def predict_proba(self, X):
available = [c for c in self.feature_cols if c in X.columns]
return self.model.predict_proba(X[available])
wrapped_xgb = WrappedTree(xgb_model, feat_cols)
wrapped_lgb = WrappedTree(lgb_model, feat_cols)
wrapped_mlp = WrappedMLP(mlp_model, feat_cols)
model_dict = {
"xgboost": wrapped_xgb,
"lightgbm": wrapped_lgb,
"mlp": wrapped_mlp,
}
weights = compute_ensemble_weights(model_dict, X_val, y_val, feat_cols)
# ── Build ensemble ─────────────────────────────────────────────────────────
print("\n[8/9] Building WeightedEnsemble …")
ensemble = TurfEnsemble(xgb_model, lgb_model, mlp_model, weights, feat_cols)
# TurfEnsemble already has .feature_cols; use it directly for evaluation
ensemble_eval = ensemble
# ── Holdout evaluation ─────────────────────────────────────────────────────
print("\n[9/9] Evaluating all models on holdout …")
results = {}
# Individual models
for name, wrapped in model_dict.items():
res = evaluate_model(wrapped, X_holdout, y_holdout, holdout_df, name)
results[name] = res
print(
f" {name:12s} Precision@3={res['precision_at3']:.4f} AUC={res['auc']:.4f}"
)
# Ensemble
ens_res = evaluate_model(
ensemble_eval, X_holdout, y_holdout, holdout_df, "ensemble"
)
results["ensemble"] = ens_res
print(
f" {'ensemble':12s} Precision@3={ens_res['precision_at3']:.4f} AUC={ens_res['auc']:.4f}"
)
# Baseline comparison
baseline_p3 = baseline["precision_at3"]
ensemble_p3 = ens_res["precision_at3"]
delta = ensemble_p3 - baseline_p3
print("\n" + "=" * 65)
print("BENCHMARK SUMMARY")
print("=" * 65)
print(f" Baseline XGBoost Precision@3: {baseline_p3:.4f}")
print(f" Optimized Ensemble Precision@3: {ensemble_p3:.4f}")
print(f" Delta: {delta:+.4f} ({delta * 100:+.1f}%)")
deploy = delta >= DEPLOY_THRESHOLD
print(
f" Deploy threshold (+{DEPLOY_THRESHOLD * 100:.0f}%): {'✅ DEPLOY' if deploy else '❌ BELOW THRESHOLD'}"
)
print("=" * 65)
# ── Save models ────────────────────────────────────────────────────────────
ensemble_path = MODELS_DIR / "ensemble_top3.pkl"
with open(ensemble_path, "wb") as f:
pickle.dump(ensemble, f)
print(f"\n ✅ Ensemble saved → {ensemble_path}")
# Also save individual optimized models
for name, model in [("xgboost_optimized", xgb_model), ("lightgbm", lgb_model)]:
model_path = MODELS_DIR / f"{name}_top3.pkl"
with open(model_path, "wb") as f:
pickle.dump(
{
"model": model,
"feature_cols": feat_cols,
"params": xgb_params if name.startswith("xgb") else lgb_params,
},
f,
)
print(f"{name} saved → {model_path}")
mlp_path = MODELS_DIR / "mlp_top3.pkl"
with open(mlp_path, "wb") as f:
pickle.dump({"pipeline": mlp_model, "feature_cols": feat_cols}, f)
print(f" ✅ MLP saved → {mlp_path}")
# ── Save benchmark report ──────────────────────────────────────────────────
report = {
"run_date": datetime.now().isoformat(),
"dataset": {
"db_path": args.db,
"total_rows": len(df),
"train_rows": len(X_train),
"holdout_rows": len(X_holdout),
"train_date_range": [
str(train_df["date_programme"].min()),
str(train_df["date_programme"].max()),
],
"holdout_date_range": [
str(holdout_df["date_programme"].min()),
str(holdout_df["date_programme"].max()),
],
},
"baseline": baseline,
"individual_models": {k: v for k, v in results.items() if k != "ensemble"},
"ensemble": ens_res,
"delta_precision_at3": round(delta, 4),
"deploy": deploy,
"optuna": {
"n_trials": n_trials,
"xgboost_best_params": xgb_params,
"lightgbm_best_params": lgb_params,
},
"features": {
"total": len(feat_cols),
"selected_by_shap": len(selected_features),
"feature_list": feat_cols,
"shap_selected": selected_features,
},
"ensemble_weights": weights,
}
report_path = MODELS_DIR / "benchmark_report.json"
with open(report_path, "w") as f:
json.dump(report, f, indent=2)
print(f" ✅ Benchmark report → {report_path}")
# Human-readable markdown
md_path = MODELS_DIR / "benchmark_report.md"
_write_markdown_report(report, md_path)
print(f" ✅ Markdown report → {md_path}")
return report
def _write_markdown_report(report: dict, path: Path):
"""Write a human-readable markdown benchmark report."""
b = report["baseline"]
e = report["ensemble"]
delta = report["delta_precision_at3"]
deploy_str = (
"✅ DEPLOIEMENT RECOMMANDE"
if report["deploy"]
else "❌ EN DESSOUS DU SEUIL (+5%)"
)
lines = [
f"# Benchmark ML Ensemble — Turf Prédictions",
f"",
f"**Date:** {report['run_date'][:10]} ",
f"**Dataset:** {report['dataset']['total_rows']:,} partants ",
f"**Holdout:** {report['dataset']['holdout_rows']:,} lignes ({report['dataset']['holdout_date_range'][0]}{report['dataset']['holdout_date_range'][1]})",
f"",
f"## Résultats",
f"",
f"| Modèle | Precision@3 | AUC | Latence/prédiction |",
f"|--------|-------------|-----|-------------------|",
f"| XGBoost (baseline) | {b['precision_at3']:.4f} | {b['auc']:.4f} | — |",
]
for name, res in report["individual_models"].items():
lines.append(
f"| {name} | {res['precision_at3']:.4f} | {res['auc']:.4f} | {res['latency_ms_per_row']:.2f} ms |"
)
lines += [
f"| **Ensemble** | **{e['precision_at3']:.4f}** | **{e['auc']:.4f}** | **{e['latency_ms_per_row']:.2f} ms** |",
f"",
f"## Décision de déploiement",
f"",
f"- Delta Precision@3 : **{delta:+.4f}** ({delta * 100:+.1f}%)",
f"- Seuil requis : **+5%**",
f"- Résultat : **{deploy_str}**",
f"",
f"## Optimisation Optuna",
f"",
f"- Trials XGBoost : {report['optuna']['n_trials']}",
f"- Trials LightGBM : {report['optuna']['n_trials']}",
f"- Pruning : MedianPruner",
f"",
f"### Meilleurs hyperparamètres XGBoost",
f"```json",
json.dumps(report["optuna"]["xgboost_best_params"], indent=2),
f"```",
f"",
f"### Meilleurs hyperparamètres LightGBM",
f"```json",
json.dumps(report["optuna"]["lightgbm_best_params"], indent=2),
f"```",
f"",
f"## Features",
f"",
f"- Total features : {report['features']['total']}",
f"- Retenues par SHAP : {report['features']['selected_by_shap']}",
f"",
f"## Poids de l'ensemble",
f"",
]
for name, w in report["ensemble_weights"].items():
lines.append(f"- {name} : {w:.4f}")
path.write_text("\n".join(lines))
if __name__ == "__main__":
parser = argparse.ArgumentParser(
description="Train ensemble ML model for turf predictions"
)
parser.add_argument("--db", default=DB_PATH, help="Path to SQLite database")
parser.add_argument(
"--trials", type=int, default=MIN_TRIALS, help="Optuna trials per model"
)
parser.add_argument(
"--quick", action="store_true", help="Quick mode: 10 trials only"
)
args = parser.parse_args()
if args.quick:
args.trials = 10
report = main(args)
print(f"\nDone. Deploy={report['deploy']}")