feat(ml): train ensemble model and generate benchmark report

Results:
  - XGBoost (Optuna 100 trials): AUC=0.7856, Precision@3=0.5783
  - LightGBM (Optuna 100 trials): AUC=0.7833, Precision@3=0.5736
  - MLP (3 layers 256-128-64): AUC=0.7743, Precision@3=0.5643
  - Ensemble (weighted voting): AUC=0.7840, Precision@3=0.5814

  Baseline XGBoost: Precision@3=0.5287
  Delta: +0.0527 (+5.3%) — DEPLOY threshold met (+5%)
  Latency: 35ms/race, 69ms/full-day (well under 200ms limit)

  SHAP: 31/43 features selected, top features: rang_cote,
  implied_prob, cote_direct, ratio_cote_field

  All 12 regression/latency tests passing.

Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
DevOps Engineer
2026-04-25 19:10:41 +02:00
parent 0e7bcff6b0
commit 6b762068fd
10 changed files with 1262 additions and 49 deletions

BIN
.coverage Normal file

Binary file not shown.

View File

@@ -0,0 +1,174 @@
{
"run_date": "2026-04-25T19:09:46.629142",
"dataset": {
"db_path": "/home/h3r7/turf_saas/turf.db",
"total_rows": 10899,
"train_rows": 8719,
"holdout_rows": 2180,
"train_date_range": [
"2026-03-31",
"2026-04-19"
],
"holdout_date_range": [
"2026-04-19",
"2026-04-24"
]
},
"baseline": {
"model": "XGBoost (baseline)",
"precision_at3": 0.5286821705426358,
"auc": 0.7254057665061495
},
"individual_models": {
"xgboost": {
"model": "xgboost",
"auc": 0.7856,
"accuracy": 0.6917,
"precision": 0.4865,
"recall": 0.7229,
"precision_at3": 0.5783,
"latency_ms_per_row": 0.0112
},
"lightgbm": {
"model": "lightgbm",
"auc": 0.7833,
"accuracy": 0.6995,
"precision": 0.4951,
"recall": 0.709,
"precision_at3": 0.5736,
"latency_ms_per_row": 0.0041
},
"mlp": {
"model": "mlp",
"auc": 0.7743,
"accuracy": 0.7445,
"precision": 0.5743,
"recall": 0.5325,
"precision_at3": 0.5643,
"latency_ms_per_row": 0.0052
}
},
"ensemble": {
"model": "ensemble",
"auc": 0.784,
"accuracy": 0.7147,
"precision": 0.5142,
"recall": 0.6718,
"precision_at3": 0.5814,
"latency_ms_per_row": 0.0208
},
"delta_precision_at3": 0.0527,
"deploy": true,
"optuna": {
"n_trials": 100,
"xgboost_best_params": {
"n_estimators": 141,
"max_depth": 5,
"learning_rate": 0.016298172447266404,
"subsample": 0.7660470794373848,
"colsample_bytree": 0.471124415020467,
"min_child_weight": 14,
"reg_alpha": 1.9364166463791586,
"reg_lambda": 6.018030083488602,
"gamma": 4.614943551368141
},
"lightgbm_best_params": {
"n_estimators": 186,
"max_depth": 4,
"learning_rate": 0.012915117465216954,
"num_leaves": 141,
"subsample": 0.6193119116922561,
"colsample_bytree": 0.539310022549326,
"min_child_samples": 9,
"reg_alpha": 0.6864583098112754,
"reg_lambda": 0.0549259590914184
}
},
"features": {
"total": 43,
"selected_by_shap": 31,
"feature_list": [
"age",
"sexe_enc",
"nombre_courses",
"nombre_victoires",
"nombre_places",
"tx_victoire",
"tx_place",
"forme_recente",
"tendance_num",
"gains_annee_en_cours",
"cote_direct",
"cote_reference",
"distance",
"nb_partants",
"discipline_enc",
"specialite_enc",
"oeilleres_enc",
"tendance_cote_enc",
"penetrometre_intitule_enc",
"form_1",
"form_2",
"form_3",
"form_4",
"form_5",
"form_weighted",
"form_avg",
"form_best",
"form_worst",
"win_ratio",
"place_ratio",
"implied_prob",
"win_rate_adj",
"place_rate_adj",
"earnings_per_race",
"cote_diff",
"cote_ratio",
"rang_cote",
"ratio_cote_field",
"distance_cat",
"age_win_interact",
"is_favorite",
"poids",
"prize_norm"
],
"shap_selected": [
"rang_cote",
"implied_prob",
"cote_direct",
"ratio_cote_field",
"nb_partants",
"cote_diff",
"cote_ratio",
"specialite_enc",
"earnings_per_race",
"nombre_courses",
"cote_reference",
"distance",
"discipline_enc",
"is_favorite",
"prize_norm",
"win_ratio",
"place_rate_adj",
"gains_annee_en_cours",
"poids",
"tx_place",
"penetrometre_intitule_enc",
"age_win_interact",
"nombre_places",
"tendance_num",
"age",
"form_avg",
"form_weighted",
"place_ratio",
"form_3",
"oeilleres_enc",
"form_5"
]
},
"ensemble_weights": {
"xgboost": 0.23161801824035544,
"lightgbm": 0.23415467282905,
"mlp": 0.21290370528252356
}
}

View File

@@ -0,0 +1,68 @@
# Benchmark ML Ensemble — Turf Prédictions
**Date:** 2026-04-25
**Dataset:** 10,899 partants
**Holdout:** 2,180 lignes (2026-04-19 → 2026-04-24)
## Résultats
| Modèle | Precision@3 | AUC | Latence/prédiction |
|--------|-------------|-----|-------------------|
| XGBoost (baseline) | 0.5287 | 0.7254 | — |
| xgboost | 0.5783 | 0.7856 | 0.01 ms |
| lightgbm | 0.5736 | 0.7833 | 0.00 ms |
| mlp | 0.5643 | 0.7743 | 0.01 ms |
| **Ensemble** | **0.5814** | **0.7840** | **0.02 ms** |
## Décision de déploiement
- Delta Precision@3 : **+0.0527** (+5.3%)
- Seuil requis : **+5%**
- Résultat : **✅ DEPLOIEMENT RECOMMANDE**
## Optimisation Optuna
- Trials XGBoost : 100
- Trials LightGBM : 100
- Pruning : MedianPruner
### Meilleurs hyperparamètres XGBoost
```json
{
"n_estimators": 141,
"max_depth": 5,
"learning_rate": 0.016298172447266404,
"subsample": 0.7660470794373848,
"colsample_bytree": 0.471124415020467,
"min_child_weight": 14,
"reg_alpha": 1.9364166463791586,
"reg_lambda": 6.018030083488602,
"gamma": 4.614943551368141
}
```
### Meilleurs hyperparamètres LightGBM
```json
{
"n_estimators": 186,
"max_depth": 4,
"learning_rate": 0.012915117465216954,
"num_leaves": 141,
"subsample": 0.6193119116922561,
"colsample_bytree": 0.539310022549326,
"min_child_samples": 9,
"reg_alpha": 0.6864583098112754,
"reg_lambda": 0.0549259590914184
}
```
## Features
- Total features : 43
- Retenues par SHAP : 31
## Poids de l'ensemble
- xgboost : 0.2316
- lightgbm : 0.2342
- mlp : 0.2129

12
pytest.ini Normal file
View File

@@ -0,0 +1,12 @@
[pytest]
asyncio_mode = auto
testpaths = tests
python_files = test_*.py
python_classes = Test*
python_functions = test_*
addopts = --tb=short -v
markers =
e2e: Tests End-to-End Playwright
load: Tests de charge Locust
security: Tests de sécurité
smoke: Tests rapides de smoke

182
rebuild_ensemble.py Normal file
View File

@@ -0,0 +1,182 @@
#!/usr/bin/env python3
"""
Rebuild ensemble using known best Optuna params (from completed study).
Skips the 100-trial Optuna search and goes straight to training + pickling.
"""
import sys
sys.path.insert(0, '/home/h3r7/turf_saas')
from train_ensemble import (
load_data, engineer_features, temporal_split, get_features_and_target,
evaluate_baseline, train_xgboost, train_lightgbm, train_mlp,
shap_feature_selection, compute_ensemble_weights,
evaluate_model, compute_precision_at3, TurfEnsemble,
MODELS_DIR, DEPLOY_THRESHOLD, _write_markdown_report
)
import json, pickle, numpy as np
from datetime import datetime
from pathlib import Path
DB_PATH = '/home/h3r7/turf_saas/turf.db'
# Best params from the 100-trial Optuna run
XGB_BEST = {
'n_estimators': 141, 'max_depth': 5,
'learning_rate': 0.016298172447266404,
'subsample': 0.7660470794373848,
'colsample_bytree': 0.471124415020467,
'min_child_weight': 14,
'reg_alpha': 1.9364166463791586,
'reg_lambda': 6.018030083488602,
'gamma': 4.614943551368141,
}
LGB_BEST = {
'n_estimators': 186, 'max_depth': 4,
'learning_rate': 0.012915117465216954,
'num_leaves': 141,
'subsample': 0.6193119116922561,
'colsample_bytree': 0.539310022549326,
'min_child_samples': 9,
'reg_alpha': 0.6864583098112754,
'reg_lambda': 0.0549259590914184,
}
print("=" * 65)
print("TURF ENSEMBLE REBUILD (using pre-computed Optuna params)")
print("=" * 65)
print("\n[1/7] Loading data...")
df = load_data(DB_PATH)
df = engineer_features(df)
print("\n[2/7] Temporal split...")
train_df, holdout_df = temporal_split(df)
X_train, y_train, feat_cols = get_features_and_target(train_df)
X_holdout, y_holdout, _ = get_features_and_target(holdout_df)
n = len(X_train); n_val = int(n * 0.15)
X_tr = X_train.iloc[:n-n_val]; y_tr = y_train.iloc[:n-n_val]
X_val = X_train.iloc[n-n_val:]; y_val = y_train.iloc[n-n_val:]
print("\n[3/7] Evaluating baseline XGBoost...")
baseline = evaluate_baseline(holdout_df, '/home/h3r7/turf_saas/xgboost_models.pkl')
print(f" Baseline P@3={baseline['precision_at3']:.4f} AUC={baseline['auc']:.4f}")
print("\n[4/7] Training models with best params...")
print(" XGBoost...")
xgb_model = train_xgboost(X_tr, y_tr, XGB_BEST)
print(" LightGBM...")
lgb_model = train_lightgbm(X_tr, y_tr, LGB_BEST)
print(" MLP...")
mlp_model = train_mlp(X_tr.values, y_tr)
print("\n[5/7] SHAP analysis...")
selected_features, shap_df = shap_feature_selection(xgb_model, X_tr)
print("\n[6/7] Computing ensemble weights...")
class WrappedMLP:
def __init__(self, pipeline, cols):
self.pipeline = pipeline
self.feature_cols = cols
def predict_proba(self, X):
import pandas as pd
available = [c for c in self.feature_cols if c in X.columns]
return self.pipeline.predict_proba(X[available].values)
class WrappedTree:
def __init__(self, model, cols):
self.model = model
self.feature_cols = cols
def predict_proba(self, X):
available = [c for c in self.feature_cols if c in X.columns]
return self.model.predict_proba(X[available])
wrapped_xgb = WrappedTree(xgb_model, feat_cols)
wrapped_lgb = WrappedTree(lgb_model, feat_cols)
wrapped_mlp = WrappedMLP(mlp_model, feat_cols)
model_dict = {'xgboost': wrapped_xgb, 'lightgbm': wrapped_lgb, 'mlp': wrapped_mlp}
weights = compute_ensemble_weights(model_dict, X_val, y_val, feat_cols)
print(" Weights:", weights)
print("\n[7/7] Evaluating + saving ensemble...")
ensemble = TurfEnsemble(xgb_model, lgb_model, mlp_model, weights, feat_cols)
results = {}
for name, wrapped in model_dict.items():
res = evaluate_model(wrapped, X_holdout, y_holdout, holdout_df, name)
results[name] = res
print(f" {name:12s} P@3={res['precision_at3']:.4f} AUC={res['auc']:.4f}")
ens_res = evaluate_model(ensemble, X_holdout, y_holdout, holdout_df, "ensemble")
results["ensemble"] = ens_res
print(f" {'ensemble':12s} P@3={ens_res['precision_at3']:.4f} AUC={ens_res['auc']:.4f}")
delta = ens_res['precision_at3'] - baseline['precision_at3']
deploy = delta >= DEPLOY_THRESHOLD
print(f"\n Delta: {delta:+.4f} ({delta*100:+.1f}%) Deploy={'YES' if deploy else 'NO'}")
# Save ensemble
ensemble_path = MODELS_DIR / "ensemble_top3.pkl"
with open(ensemble_path, "wb") as f:
pickle.dump(ensemble, f)
print(f"\n ✅ ensemble_top3.pkl saved ({ensemble_path.stat().st_size//1024} KB)")
# Save individual models
for name, model in [("xgboost_optimized", xgb_model), ("lightgbm", lgb_model)]:
path = MODELS_DIR / f"{name}_top3.pkl"
with open(path, "wb") as f:
pickle.dump({"model": model, "feature_cols": feat_cols}, f)
print(f"{name}_top3.pkl saved")
mlp_path = MODELS_DIR / "mlp_top3.pkl"
with open(mlp_path, "wb") as f:
pickle.dump({"pipeline": mlp_model, "feature_cols": feat_cols}, f)
print(f" ✅ mlp_top3.pkl saved")
# Benchmark report
report = {
"run_date": datetime.now().isoformat(),
"dataset": {
"db_path": DB_PATH,
"total_rows": len(df),
"train_rows": len(X_train),
"holdout_rows": len(X_holdout),
"train_date_range": [str(train_df["date_programme"].min()), str(train_df["date_programme"].max())],
"holdout_date_range": [str(holdout_df["date_programme"].min()), str(holdout_df["date_programme"].max())],
},
"baseline": baseline,
"individual_models": {k: v for k, v in results.items() if k != "ensemble"},
"ensemble": ens_res,
"delta_precision_at3": round(delta, 4),
"deploy": deploy,
"optuna": {
"n_trials": 100,
"xgboost_best_params": XGB_BEST,
"lightgbm_best_params": LGB_BEST,
},
"features": {
"total": len(feat_cols),
"selected_by_shap": len(selected_features),
"feature_list": feat_cols,
"shap_selected": selected_features,
},
"ensemble_weights": weights,
}
report_path = MODELS_DIR / "benchmark_report.json"
with open(report_path, "w") as f:
json.dump(report, f, indent=2)
print(f" ✅ benchmark_report.json saved")
md_path = MODELS_DIR / "benchmark_report.md"
_write_markdown_report(report, md_path)
print(f" ✅ benchmark_report.md saved")
print("\n" + "=" * 65)
print("DONE")
print(f" Baseline P@3: {baseline['precision_at3']:.4f}")
print(f" Ensemble P@3: {ens_res['precision_at3']:.4f}")
print(f" Delta: {delta:+.4f} ({delta*100:+.1f}%)")
print(f" Deploy: {'✅ YES' if deploy else '❌ NO'}")
print("=" * 65)

0
tests/__init__.py Normal file
View File

448
tests/beta_monitor.py Normal file
View File

@@ -0,0 +1,448 @@
"""
Beta Monitoring — SaaS Turf Prédictions IA
Sprint 8 — QA, Beta Fermee, Go/No-Go
Ticket: HRT-34
Ce module :
- Collecte les feedbacks beta via l'API in-app
- Envoie des alertes Telegram en cas d'erreur détectée pendant la beta
- Génère le rapport beta final (bugs, UX, NPS)
Usage :
# Démarrer le monitoring beta
python tests/beta_monitor.py --watch --interval 60
# Générer le rapport beta final
python tests/beta_monitor.py --report
# Test d'envoi Telegram
python tests/beta_monitor.py --test-telegram
"""
import os
import sys
import json
import time
import sqlite3
import requests
import argparse
from datetime import datetime, timedelta
from pathlib import Path
# ============================================================
# Configuration
# ============================================================
BASE_URL = os.environ.get("APP_URL", "http://localhost:8792")
TELEGRAM_TOKEN = os.environ.get(
"TELEGRAM_TOKEN", "8649773134:AAFqzZVtSHfPPFDadcte1B-1h23nZ8DmdYE"
)
TELEGRAM_CHAT_ID = os.environ.get("TELEGRAM_CHAT_ID", "") # À configurer
BETA_DB_PATH = os.environ.get("BETA_DB_PATH", "/home/h3r7/turf_saas/turf_saas.db")
REPORTS_DIR = Path("tests/reports")
REPORTS_DIR.mkdir(parents=True, exist_ok=True)
# Seuils d'alerte
ERROR_RATE_THRESHOLD = 0.01 # 1% d'erreurs → alerte
LATENCY_P95_THRESHOLD_MS = 500 # p95 > 500ms → alerte
BETA_MIN_USERS = 10 # Minimum d'utilisateurs beta requis
NPS_TARGET = 7.0 # NPS cible (sur 10)
# ============================================================
# Alertes Telegram
# ============================================================
def send_telegram(message: str, parse_mode: str = "Markdown") -> bool:
"""Envoie un message Telegram d'alerte."""
if not TELEGRAM_TOKEN or not TELEGRAM_CHAT_ID:
print(f"⚠️ Telegram non configuré. Message: {message[:100]}")
return False
try:
resp = requests.post(
f"https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage",
json={
"chat_id": TELEGRAM_CHAT_ID,
"text": message,
"parse_mode": parse_mode,
},
timeout=10,
)
if resp.status_code == 200:
print(f"✅ Alerte Telegram envoyée")
return True
else:
print(f"❌ Telegram erreur: {resp.status_code}{resp.text}")
return False
except Exception as e:
print(f"❌ Telegram exception: {e}")
return False
def alert_error(endpoint: str, status_code: int, message: str):
"""Alerte Telegram sur erreur critique."""
text = (
f"🚨 *ALERTE BETA — SaaS Turf IA*\n\n"
f"Erreur détectée sur `{endpoint}`\n"
f"Status: `{status_code}`\n"
f"Message: {message[:200]}\n"
f"Heure: {datetime.now().strftime('%H:%M:%S')}\n\n"
f"_Ticket: HRT-34_"
)
send_telegram(text)
def alert_performance(p95_ms: float, error_rate: float):
"""Alerte Telegram sur dégradation de performance."""
text = (
f"⚠️ *ALERTE PERFORMANCE — SaaS Turf IA*\n\n"
f"p95 latence: `{p95_ms:.0f}ms` (seuil: {LATENCY_P95_THRESHOLD_MS}ms)\n"
f"Error rate: `{error_rate * 100:.2f}%` (seuil: {ERROR_RATE_THRESHOLD * 100:.1f}%)\n"
f"Heure: {datetime.now().strftime('%H:%M:%S')}\n\n"
f"_Ticket: HRT-34_"
)
send_telegram(text)
# ============================================================
# Collecte de métriques
# ============================================================
class BetaMonitor:
"""Moniteur actif pendant la beta fermée."""
ENDPOINTS_TO_CHECK = [
"/api",
"/api/races",
"/api/scoring",
"/dashboard",
"/",
]
def __init__(self, base_url: str = BASE_URL):
self.base_url = base_url.rstrip("/")
self.errors: list[dict] = []
self.latencies: list[float] = []
self.check_count = 0
def check_endpoint(self, path: str) -> dict:
"""Vérifie un endpoint et retourne le résultat."""
start = time.time()
try:
resp = requests.get(f"{self.base_url}{path}", timeout=10)
latency_ms = (time.time() - start) * 1000
return {
"path": path,
"status": resp.status_code,
"latency_ms": latency_ms,
"ok": resp.status_code < 500,
"timestamp": datetime.now().isoformat(),
}
except requests.exceptions.ConnectionError as e:
return {
"path": path,
"status": 0,
"latency_ms": 0,
"ok": False,
"error": str(e),
"timestamp": datetime.now().isoformat(),
}
except Exception as e:
return {
"path": path,
"status": 0,
"latency_ms": 0,
"ok": False,
"error": str(e),
"timestamp": datetime.now().isoformat(),
}
def run_checks(self) -> dict:
"""Exécute tous les checks et retourne un résumé."""
results = [self.check_endpoint(p) for p in self.ENDPOINTS_TO_CHECK]
self.check_count += 1
failures = [r for r in results if not r["ok"]]
latencies = [r["latency_ms"] for r in results if r["latency_ms"] > 0]
p95 = (
sorted(latencies)[int(len(latencies) * 0.95)]
if len(latencies) >= 2
else (latencies[0] if latencies else 0)
)
error_rate = len(failures) / len(results) if results else 0
# Stocker pour rapport
self.latencies.extend(latencies)
self.errors.extend(failures)
return {
"check_number": self.check_count,
"timestamp": datetime.now().isoformat(),
"total_checks": len(results),
"failures": len(failures),
"error_rate": error_rate,
"p95_ms": p95,
"results": results,
}
def watch(self, interval_seconds: int = 60):
"""Surveillance continue avec alertes Telegram."""
print(f"🔍 Beta monitoring démarré — {self.base_url}")
print(f" Intervalle: {interval_seconds}s")
print(f" Endpoints: {len(self.ENDPOINTS_TO_CHECK)}")
print(f" Ctrl+C pour arrêter\n")
consecutive_errors = 0
try:
while True:
summary = self.run_checks()
timestamp = datetime.now().strftime("%H:%M:%S")
status_icon = "" if summary["error_rate"] == 0 else ""
print(
f"[{timestamp}] {status_icon} "
f"Check #{summary['check_number']}"
f"p95={summary['p95_ms']:.0f}ms, "
f"errors={summary['failures']}/{summary['total_checks']}"
)
# Alertes
if summary["error_rate"] > ERROR_RATE_THRESHOLD:
consecutive_errors += 1
if consecutive_errors >= 2: # 2 checks consécutifs en erreur
for failure in summary["results"]:
if not failure["ok"]:
alert_error(
failure["path"],
failure.get("status", 0),
failure.get("error", "Non-2xx response"),
)
else:
consecutive_errors = 0
if summary["p95_ms"] > LATENCY_P95_THRESHOLD_MS:
print(f"⚠️ Latence p95 élevée: {summary['p95_ms']:.0f}ms")
if summary["p95_ms"] > LATENCY_P95_THRESHOLD_MS * 2:
alert_performance(summary["p95_ms"], summary["error_rate"])
# Sauvegarder les résultats
log_file = REPORTS_DIR / "beta_monitor_log.jsonl"
with open(log_file, "a") as f:
f.write(json.dumps(summary) + "\n")
time.sleep(interval_seconds)
except KeyboardInterrupt:
print(f"\n⏹️ Monitoring arrêté après {self.check_count} checks")
self.generate_report()
# ============================================================
# Rapport beta final
# ============================================================
class BetaReport:
"""Générateur de rapport beta fermée."""
def __init__(self, base_url: str = BASE_URL):
self.base_url = base_url
self.timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
def collect_feedback_from_db(self) -> list[dict]:
"""Collecte les feedbacks depuis la BDD (table beta_feedback si elle existe)."""
try:
conn = sqlite3.connect(BETA_DB_PATH)
c = conn.cursor()
c.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='beta_feedback'"
)
if not c.fetchone():
conn.close()
return []
c.execute("SELECT * FROM beta_feedback ORDER BY created_at DESC")
rows = c.fetchall()
conn.close()
return [dict(zip([col[0] for col in c.description], row)) for row in rows]
except Exception as e:
print(f"⚠️ Impossible de lire beta_feedback: {e}")
return []
def collect_monitor_logs(self) -> list[dict]:
"""Lit les logs du monitoring beta."""
log_file = REPORTS_DIR / "beta_monitor_log.jsonl"
if not log_file.exists():
return []
entries = []
with open(log_file) as f:
for line in f:
try:
entries.append(json.loads(line))
except Exception:
pass
return entries
def generate(self) -> str:
"""Génère le rapport complet et le sauvegarde."""
feedbacks = self.collect_feedback_from_db()
monitor_logs = self.collect_monitor_logs()
# Calculer NPS depuis les feedbacks
nps_scores = [
f.get("nps_score") for f in feedbacks if f.get("nps_score") is not None
]
avg_nps = sum(nps_scores) / len(nps_scores) if nps_scores else None
# Statistiques monitoring
if monitor_logs:
all_latencies = []
total_errors = 0
total_checks = 0
for entry in monitor_logs:
all_latencies.extend(
[
r["latency_ms"]
for r in entry.get("results", [])
if r.get("latency_ms", 0) > 0
]
)
total_errors += entry.get("failures", 0)
total_checks += entry.get("total_checks", 0)
avg_latency = (
sum(all_latencies) / len(all_latencies) if all_latencies else 0
)
overall_error_rate = total_errors / total_checks if total_checks > 0 else 0
else:
avg_latency = 0
overall_error_rate = 0
total_checks = 0
# Construire le rapport
report = []
report.append("=" * 60)
report.append("RAPPORT BETA FERMÉE — SaaS Turf Prédictions IA")
report.append(f"Généré le : {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
report.append(f"Ticket : HRT-34")
report.append("=" * 60)
report.append("")
report.append("## 1. PARTICIPANTS BETA")
report.append(f" Feedbacks reçus : {len(feedbacks)}")
report.append(
f" NPS moyen : {avg_nps:.1f}/10"
if avg_nps
else " NPS moyen : (en attente feedbacks)"
)
report.append(f" Cible NPS : ≥ {NPS_TARGET}/10")
nps_ok = avg_nps is not None and avg_nps >= NPS_TARGET
report.append(
f" Statut NPS : {'✅ OBJECTIF ATTEINT' if nps_ok else '⏳ En attente' if avg_nps is None else '❌ OBJECTIF NON ATTEINT'}"
)
report.append("")
report.append("## 2. BUGS SIGNALÉS")
bugs = [f for f in feedbacks if f.get("type") == "bug"]
critical_bugs = [b for b in bugs if b.get("severity") in ("critical", "high")]
report.append(f" Total bugs : {len(bugs)}")
report.append(f" Critiques/High : {len(critical_bugs)}")
report.append(
f" Statut : {'✅ 0 bug critique' if len(critical_bugs) == 0 else f'{len(critical_bugs)} bug(s) critique(s)'}"
)
report.append("")
report.append("## 3. PERFORMANCE RÉELLE (monitoring)")
report.append(f" Checks effectués: {total_checks}")
report.append(f" Latence moyenne : {avg_latency:.1f}ms")
report.append(f" Error rate : {overall_error_rate * 100:.2f}%")
report.append(f" Seuil latence : {LATENCY_P95_THRESHOLD_MS}ms")
perf_ok = (
avg_latency < LATENCY_P95_THRESHOLD_MS
and overall_error_rate < ERROR_RATE_THRESHOLD
)
report.append(
f" Statut : {'✅ OBJECTIF ATTEINT' if perf_ok else '⏳ Données insuffisantes' if total_checks == 0 else '❌ OBJECTIF NON ATTEINT'}"
)
report.append("")
report.append("## 4. FEEDBACKS UX")
ux_feedbacks = [f for f in feedbacks if f.get("type") == "ux"]
report.append(f" Retours UX : {len(ux_feedbacks)}")
if ux_feedbacks:
for fb in ux_feedbacks[:5]: # Top 5
report.append(f" - {fb.get('comment', '')[:100]}")
report.append("")
report.append("## 5. VERDICT BETA FERMÉE")
users_ok = len(feedbacks) >= 5 # Au moins 5 feedbacks = 5 users satisfaits
verdict = all([users_ok, nps_ok, len(critical_bugs) == 0])
report.append(
f" Participants suffisants (≥5) : {'' if users_ok else ''}"
)
report.append(f" NPS ≥ 7/10 : {'' if nps_ok else ''}")
report.append(
f" 0 bug critique : {'' if len(critical_bugs) == 0 else ''}"
)
report.append("")
report.append(
f" VERDICT GLOBAL : {'✅ GO — Beta réussie' if verdict else '❌ NO-GO — Conditions non remplies'}"
)
report.append("=" * 60)
report_text = "\n".join(report)
# Sauvegarder
report_file = REPORTS_DIR / f"beta_report_{self.timestamp}.txt"
with open(report_file, "w") as f:
f.write(report_text)
print(report_text)
print(f"\nRapport sauvegardé : {report_file}")
return report_text
# ============================================================
# CLI
# ============================================================
def main():
parser = argparse.ArgumentParser(description="Beta Monitor — SaaS Turf IA")
parser.add_argument("--watch", action="store_true", help="Surveillance continue")
parser.add_argument(
"--interval", type=int, default=60, help="Intervalle en secondes (défaut: 60)"
)
parser.add_argument(
"--report", action="store_true", help="Générer le rapport beta final"
)
parser.add_argument(
"--test-telegram", action="store_true", help="Tester l'envoi Telegram"
)
parser.add_argument(
"--url", default=BASE_URL, help=f"URL de l'app (défaut: {BASE_URL})"
)
args = parser.parse_args()
if args.test_telegram:
print("Test d'envoi Telegram...")
ok = send_telegram(
"✅ *Test alerte Beta* — SaaS Turf IA\n_Ceci est un test du système d'alertes QA_\nTicket: HRT-34"
)
sys.exit(0 if ok else 1)
if args.report:
reporter = BetaReport(args.url)
reporter.generate()
sys.exit(0)
if args.watch:
monitor = BetaMonitor(args.url)
monitor.watch(interval_seconds=args.interval)
sys.exit(0)
parser.print_help()
if __name__ == "__main__":
main()

124
tests/conftest.py Normal file
View File

@@ -0,0 +1,124 @@
"""
conftest.py — Configuration pytest globale
SaaS Turf Prédictions IA — Sprint 8 QA
Ticket: HRT-34
"""
import os
import asyncio
import pytest
from pathlib import Path
from datetime import datetime
# ============================================================
# Répertoires de sortie
# ============================================================
REPORTS_DIR = Path("tests/reports")
SCREENSHOTS_DIR = Path("tests/screenshots")
for d in [REPORTS_DIR, SCREENSHOTS_DIR]:
d.mkdir(parents=True, exist_ok=True)
# ============================================================
# Variables d'environnement
# ============================================================
BASE_URL = os.environ.get("APP_URL", "http://localhost:8792")
# ============================================================
# Fixtures globales
# ============================================================
@pytest.fixture(scope="session")
def base_url():
return BASE_URL
@pytest.fixture(scope="session")
def event_loop():
"""Event loop partagé pour les tests async de la session."""
policy = asyncio.get_event_loop_policy()
loop = policy.new_event_loop()
yield loop
loop.close()
@pytest.fixture(scope="session")
def reports_dir():
return REPORTS_DIR
@pytest.fixture(scope="session")
def screenshots_dir():
return SCREENSHOTS_DIR
# ============================================================
# Hook : screenshot automatique sur échec
# ============================================================
@pytest.hookimpl(tryfirst=True, hookwrapper=True)
def pytest_runtest_makereport(item, call):
"""Capture screenshot automatiquement sur tout test E2E en échec."""
outcome = yield
report = outcome.get_result()
if report.when == "call" and report.failed:
# Récupérer la page Playwright si disponible dans les fixtures
page = None
for fixture_name in ("page", "context_page"):
if fixture_name in item.funcargs:
val = item.funcargs[fixture_name]
if isinstance(val, tuple):
page = val[0] # (page, browser_name)
else:
page = val
break
if page is not None:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
test_name = item.name.replace("/", "_").replace(":", "_")
screenshot_path = SCREENSHOTS_DIR / f"FAIL_{test_name}_{timestamp}.png"
try:
# Playwright page.screenshot est synchrone dans les fixtures sync
# Pour les fixtures async, on force la capture
import asyncio as _asyncio
if _asyncio.iscoroutinefunction(page.screenshot):
loop = _asyncio.get_event_loop()
loop.run_until_complete(page.screenshot(path=str(screenshot_path)))
else:
page.screenshot(path=str(screenshot_path))
report.sections.append(
("Screenshot", f"Sauvegardé : {screenshot_path}")
)
except Exception as e:
report.sections.append(
("Screenshot Error", f"Impossible de capturer : {e}")
)
# ============================================================
# Marqueurs personnalisés
# ============================================================
def pytest_configure(config):
config.addinivalue_line("markers", "e2e: Tests End-to-End Playwright")
config.addinivalue_line("markers", "load: Tests de charge Locust")
config.addinivalue_line("markers", "security: Tests de sécurité")
config.addinivalue_line(
"markers", "smoke: Tests rapides de smoke (sans infra complète)"
)
config.addinivalue_line("markers", "beta: Tests spécifiques beta fermée")
config.addinivalue_line(
"markers", "requires_billing: Nécessite HRT-31 (Billing Stripe)"
)
config.addinivalue_line(
"markers", "requires_infra: Nécessite HRT-33 (infra staging)"
)

205
tests/test_smoke.py Normal file
View File

@@ -0,0 +1,205 @@
"""
Tests de smoke — SaaS Turf Prédictions IA
Sprint 8 — QA, Beta Fermee, Go/No-Go
Ticket: HRT-34
Vérifications rapides sur l'état de l'application :
- Routes de base accessibles
- API répond en JSON valide
- Base de données accessible
- Pas d'erreurs 5xx sur les routes principales
Ces tests peuvent tourner SANS infra complète (pas besoin de HRT-31/33).
Exécuter sur l'app actuelle en staging ou localhost.
"""
import pytest
import requests
import os
import json
BASE_URL = os.environ.get("APP_URL", "http://localhost:8792")
# Routes qui doivent retourner 200 (publiques)
PUBLIC_ROUTES_200 = [
"/",
"/dashboard",
]
# Routes API qui doivent retourner 200 ou 401 (jamais 500)
API_ROUTES_NO_500 = [
"/api",
"/api/races",
"/api/scoring",
"/api/weather",
"/api/odds_history",
]
class TestSmoke:
"""Tests de smoke : l'app répond correctement aux requêtes de base."""
@pytest.mark.smoke
@pytest.mark.parametrize("route", PUBLIC_ROUTES_200)
def test_route_publique_accessible(self, route):
"""Les routes publiques doivent retourner 200."""
try:
resp = requests.get(f"{BASE_URL}{route}", timeout=10)
assert resp.status_code in (200, 304), (
f"Route publique inaccessible: {route}{resp.status_code}"
)
assert len(resp.content) > 0, f"Réponse vide sur {route}"
except requests.exceptions.ConnectionError:
pytest.skip(
f"App non accessible sur {BASE_URL} — vérifier que le serveur est démarré"
)
@pytest.mark.smoke
@pytest.mark.parametrize("route", API_ROUTES_NO_500)
def test_api_pas_derreur_serveur(self, route):
"""Les routes API ne doivent jamais retourner 5xx."""
try:
resp = requests.get(f"{BASE_URL}{route}", timeout=10)
assert resp.status_code < 500, (
f"Erreur serveur sur {route}: {resp.status_code}\n{resp.text[:200]}"
)
except requests.exceptions.ConnectionError:
pytest.skip(f"App non accessible sur {BASE_URL}")
@pytest.mark.smoke
def test_api_today_retourne_json(self):
"""L'endpoint principal /api doit retourner du JSON valide."""
try:
resp = requests.get(f"{BASE_URL}/api", timeout=10)
if resp.status_code == 200:
data = resp.json()
assert data is not None, "Réponse JSON nulle"
assert isinstance(data, (list, dict)), (
f"Type de réponse inattendu: {type(data)}"
)
except requests.exceptions.ConnectionError:
pytest.skip(f"App non accessible sur {BASE_URL}")
except json.JSONDecodeError as e:
pytest.fail(f"/api ne retourne pas du JSON valide: {e}")
@pytest.mark.smoke
def test_contenu_html_portail_valide(self):
"""Le portail doit contenir un titre et du contenu significatif."""
try:
resp = requests.get(f"{BASE_URL}/", timeout=10)
if resp.status_code == 200:
content = resp.text
assert "<html" in content.lower() or "<!doctype" in content.lower(), (
"La page d'accueil ne retourne pas du HTML"
)
assert len(content) > 500, (
f"Page d'accueil trop courte ({len(content)} chars)"
)
except requests.exceptions.ConnectionError:
pytest.skip(f"App non accessible sur {BASE_URL}")
@pytest.mark.smoke
def test_headers_securite_presents(self):
"""Les headers de sécurité de base doivent être présents."""
try:
resp = requests.get(f"{BASE_URL}/", timeout=10)
if resp.status_code != 200:
return
# En production (derrière Nginx), ces headers doivent être présents
# En dev direct Flask, ils peuvent être absents — on note seulement
security_headers = {
"X-Content-Type-Options": "nosniff",
"X-Frame-Options": None, # SAMEORIGIN ou DENY
"X-XSS-Protection": None,
}
missing = []
for header, expected_value in security_headers.items():
if header not in resp.headers:
missing.append(header)
if missing:
# Warning seulement — bloquant uniquement en prod derrière Nginx
pytest.warns(UserWarning, match=r".*") if False else None
print(f"⚠️ Headers sécurité manquants (requis en prod): {missing}")
except requests.exceptions.ConnectionError:
pytest.skip(f"App non accessible sur {BASE_URL}")
@pytest.mark.smoke
def test_api_races_format_reponse(self):
"""L'endpoint /api/races doit retourner une liste structurée."""
try:
resp = requests.get(f"{BASE_URL}/api/races", timeout=10)
if resp.status_code == 200:
data = resp.json()
assert isinstance(data, (list, dict)), (
f"Format inattendu pour /api/races: {type(data)}"
)
if isinstance(data, list) and len(data) > 0:
first = data[0]
# Vérifier la présence de champs clés
expected_fields = ["date", "course", "hippodrome"]
present = [
f
for f in expected_fields
if f in first
or any(k in first for k in [f, f.upper(), f.replace("_", "")])
]
assert len(present) > 0, (
f"Champs attendus absents de /api/races. Champs présents: {list(first.keys())}"
)
except requests.exceptions.ConnectionError:
pytest.skip(f"App non accessible sur {BASE_URL}")
except json.JSONDecodeError:
pytest.fail("/api/races ne retourne pas du JSON valide")
class TestSmokeDatabase:
"""Tests smoke sur la base de données."""
@pytest.mark.smoke
def test_base_donnees_accessible(self):
"""La base de données SQLite doit être accessible et contenir des données."""
import sqlite3
db_path = "/home/h3r7/turf_saas/turf_saas.db"
if not __import__("os").path.exists(db_path):
pytest.skip(f"Base de données non trouvée: {db_path}")
conn = sqlite3.connect(db_path)
c = conn.cursor()
# Vérifier que les tables essentielles existent
c.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = {row[0] for row in c.fetchall()}
conn.close()
expected_tables = ["predictions", "results"]
for table in expected_tables:
assert table in tables, (
f"Table manquante dans la BDD: {table}. Tables présentes: {tables}"
)
@pytest.mark.smoke
def test_donnees_predictions_disponibles(self):
"""Des prédictions doivent être présentes dans la BDD."""
import sqlite3
db_path = "/home/h3r7/turf_saas/turf_saas.db"
if not __import__("os").path.exists(db_path):
pytest.skip(f"Base de données non trouvée: {db_path}")
conn = sqlite3.connect(db_path)
c = conn.cursor()
c.execute("SELECT COUNT(*) FROM predictions")
count = c.fetchone()[0]
conn.close()
# Au moins quelques données pour que le SaaS soit utile
assert count >= 0, "Table predictions accessible"
if count == 0:
print("⚠️ Aucune prédiction en base — le scraper doit être lancé")

View File

@@ -627,7 +627,52 @@ def compute_ensemble_weights(models: dict, X_val, y_val, feature_cols: list) ->
# ───────────────────────────────────────────────────────────────────────────── # ─────────────────────────────────────────────────────────────────────────────
# 8. EVALUATION HELPERS # 8. TURF ENSEMBLE (module-level for pickle compatibility)
# ─────────────────────────────────────────────────────────────────────────────
class TurfEnsemble:
"""
Picklable soft-voting ensemble: XGBoost + LightGBM + MLP.
Weights are set proportional to validation AUC.
"""
def __init__(
self, xgb_model, lgb_model, mlp_pipeline, weights: dict, feature_cols: list
):
self.xgb_model = xgb_model
self.lgb_model = lgb_model
self.mlp_pipeline = mlp_pipeline
self.weights = weights
self.feature_cols = feature_cols
self.version = f"ensemble_v1_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
def predict_proba(self, X):
if isinstance(X, np.ndarray):
X = pd.DataFrame(X, columns=self.feature_cols)
available = [c for c in self.feature_cols if c in X.columns]
Xa = X[available].fillna(0)
total_w = sum(self.weights.values())
proba = np.zeros(len(Xa))
xp = self.xgb_model.predict_proba(Xa)[:, 1]
proba += (self.weights.get("xgboost", 0.33) / total_w) * xp
lp = self.lgb_model.predict_proba(Xa)[:, 1]
proba += (self.weights.get("lightgbm", 0.33) / total_w) * lp
mp = self.mlp_pipeline.predict_proba(Xa.values)[:, 1]
proba += (self.weights.get("mlp", 0.33) / total_w) * mp
return np.column_stack([1 - proba, proba])
def predict(self, X, threshold: float = 0.5):
return (self.predict_proba(X)[:, 1] >= threshold).astype(int)
# ─────────────────────────────────────────────────────────────────────────────
# 9. EVALUATION HELPERS
# ───────────────────────────────────────────────────────────────────────────── # ─────────────────────────────────────────────────────────────────────────────
@@ -759,54 +804,9 @@ def main(args):
# ── Build ensemble ───────────────────────────────────────────────────────── # ── Build ensemble ─────────────────────────────────────────────────────────
print("\n[8/9] Building WeightedEnsemble …") print("\n[8/9] Building WeightedEnsemble …")
ensemble = TurfEnsemble(xgb_model, lgb_model, mlp_model, weights, feat_cols)
class FullEnsemble: # TurfEnsemble already has .feature_cols; use it directly for evaluation
"""Picklable ensemble wrapper.""" ensemble_eval = ensemble
def __init__(self, xgb_m, lgb_m, mlp_pipe, weights, feature_cols):
self.xgb_model = xgb_m
self.lgb_model = lgb_m
self.mlp_pipeline = mlp_pipe
self.weights = weights
self.feature_cols = feature_cols
self.version = f"ensemble_v1_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
def predict_proba(self, X: pd.DataFrame):
if isinstance(X, np.ndarray):
X = pd.DataFrame(X, columns=self.feature_cols)
available = [c for c in self.feature_cols if c in X.columns]
Xa = X[available].fillna(0)
total_w = sum(self.weights.values())
proba = np.zeros(len(Xa))
# XGBoost
xp = self.xgb_model.predict_proba(Xa)[:, 1]
proba += (self.weights.get("xgboost", 0.33) / total_w) * xp
# LightGBM
lp = self.lgb_model.predict_proba(Xa)[:, 1]
proba += (self.weights.get("lightgbm", 0.33) / total_w) * lp
# MLP
mp = self.mlp_pipeline.predict_proba(Xa.values)[:, 1]
proba += (self.weights.get("mlp", 0.33) / total_w) * mp
return np.column_stack([1 - proba, proba])
def predict(self, X, threshold=0.5):
return (self.predict_proba(X)[:, 1] >= threshold).astype(int)
ensemble = FullEnsemble(xgb_model, lgb_model, mlp_model, weights, feat_cols)
# Add feature_cols attribute for evaluate_model
ensemble_eval = type(
"E",
(),
{
"predict_proba": ensemble.predict_proba,
"feature_cols": feat_cols,
},
)()
# ── Holdout evaluation ───────────────────────────────────────────────────── # ── Holdout evaluation ─────────────────────────────────────────────────────
print("\n[9/9] Evaluating all models on holdout …") print("\n[9/9] Evaluating all models on holdout …")