Merge pull request 'Sprint 6-7 — ML Upgrade: Ensemble XGBoost+LightGBM+MLP + Optuna' (#1) from feature/ml-upgrade-ensemble into master

This commit was merged in pull request #1.
This commit is contained in:
2026-04-25 19:15:15 +02:00
13 changed files with 3173 additions and 8 deletions

0
tests/__init__.py Normal file
View File

448
tests/beta_monitor.py Normal file
View File

@@ -0,0 +1,448 @@
"""
Beta Monitoring — SaaS Turf Prédictions IA
Sprint 8 — QA, Beta Fermee, Go/No-Go
Ticket: HRT-34
Ce module :
- Collecte les feedbacks beta via l'API in-app
- Envoie des alertes Telegram en cas d'erreur détectée pendant la beta
- Génère le rapport beta final (bugs, UX, NPS)
Usage :
# Démarrer le monitoring beta
python tests/beta_monitor.py --watch --interval 60
# Générer le rapport beta final
python tests/beta_monitor.py --report
# Test d'envoi Telegram
python tests/beta_monitor.py --test-telegram
"""
import os
import sys
import json
import time
import sqlite3
import requests
import argparse
from datetime import datetime, timedelta
from pathlib import Path
# ============================================================
# Configuration
# ============================================================
BASE_URL = os.environ.get("APP_URL", "http://localhost:8792")
TELEGRAM_TOKEN = os.environ.get(
"TELEGRAM_TOKEN", "8649773134:AAFqzZVtSHfPPFDadcte1B-1h23nZ8DmdYE"
)
TELEGRAM_CHAT_ID = os.environ.get("TELEGRAM_CHAT_ID", "") # À configurer
BETA_DB_PATH = os.environ.get("BETA_DB_PATH", "/home/h3r7/turf_saas/turf_saas.db")
REPORTS_DIR = Path("tests/reports")
REPORTS_DIR.mkdir(parents=True, exist_ok=True)
# Seuils d'alerte
ERROR_RATE_THRESHOLD = 0.01 # 1% d'erreurs → alerte
LATENCY_P95_THRESHOLD_MS = 500 # p95 > 500ms → alerte
BETA_MIN_USERS = 10 # Minimum d'utilisateurs beta requis
NPS_TARGET = 7.0 # NPS cible (sur 10)
# ============================================================
# Alertes Telegram
# ============================================================
def send_telegram(message: str, parse_mode: str = "Markdown") -> bool:
"""Envoie un message Telegram d'alerte."""
if not TELEGRAM_TOKEN or not TELEGRAM_CHAT_ID:
print(f"⚠️ Telegram non configuré. Message: {message[:100]}")
return False
try:
resp = requests.post(
f"https://api.telegram.org/bot{TELEGRAM_TOKEN}/sendMessage",
json={
"chat_id": TELEGRAM_CHAT_ID,
"text": message,
"parse_mode": parse_mode,
},
timeout=10,
)
if resp.status_code == 200:
print(f"✅ Alerte Telegram envoyée")
return True
else:
print(f"❌ Telegram erreur: {resp.status_code}{resp.text}")
return False
except Exception as e:
print(f"❌ Telegram exception: {e}")
return False
def alert_error(endpoint: str, status_code: int, message: str):
"""Alerte Telegram sur erreur critique."""
text = (
f"🚨 *ALERTE BETA — SaaS Turf IA*\n\n"
f"Erreur détectée sur `{endpoint}`\n"
f"Status: `{status_code}`\n"
f"Message: {message[:200]}\n"
f"Heure: {datetime.now().strftime('%H:%M:%S')}\n\n"
f"_Ticket: HRT-34_"
)
send_telegram(text)
def alert_performance(p95_ms: float, error_rate: float):
"""Alerte Telegram sur dégradation de performance."""
text = (
f"⚠️ *ALERTE PERFORMANCE — SaaS Turf IA*\n\n"
f"p95 latence: `{p95_ms:.0f}ms` (seuil: {LATENCY_P95_THRESHOLD_MS}ms)\n"
f"Error rate: `{error_rate * 100:.2f}%` (seuil: {ERROR_RATE_THRESHOLD * 100:.1f}%)\n"
f"Heure: {datetime.now().strftime('%H:%M:%S')}\n\n"
f"_Ticket: HRT-34_"
)
send_telegram(text)
# ============================================================
# Collecte de métriques
# ============================================================
class BetaMonitor:
"""Moniteur actif pendant la beta fermée."""
ENDPOINTS_TO_CHECK = [
"/api",
"/api/races",
"/api/scoring",
"/dashboard",
"/",
]
def __init__(self, base_url: str = BASE_URL):
self.base_url = base_url.rstrip("/")
self.errors: list[dict] = []
self.latencies: list[float] = []
self.check_count = 0
def check_endpoint(self, path: str) -> dict:
"""Vérifie un endpoint et retourne le résultat."""
start = time.time()
try:
resp = requests.get(f"{self.base_url}{path}", timeout=10)
latency_ms = (time.time() - start) * 1000
return {
"path": path,
"status": resp.status_code,
"latency_ms": latency_ms,
"ok": resp.status_code < 500,
"timestamp": datetime.now().isoformat(),
}
except requests.exceptions.ConnectionError as e:
return {
"path": path,
"status": 0,
"latency_ms": 0,
"ok": False,
"error": str(e),
"timestamp": datetime.now().isoformat(),
}
except Exception as e:
return {
"path": path,
"status": 0,
"latency_ms": 0,
"ok": False,
"error": str(e),
"timestamp": datetime.now().isoformat(),
}
def run_checks(self) -> dict:
"""Exécute tous les checks et retourne un résumé."""
results = [self.check_endpoint(p) for p in self.ENDPOINTS_TO_CHECK]
self.check_count += 1
failures = [r for r in results if not r["ok"]]
latencies = [r["latency_ms"] for r in results if r["latency_ms"] > 0]
p95 = (
sorted(latencies)[int(len(latencies) * 0.95)]
if len(latencies) >= 2
else (latencies[0] if latencies else 0)
)
error_rate = len(failures) / len(results) if results else 0
# Stocker pour rapport
self.latencies.extend(latencies)
self.errors.extend(failures)
return {
"check_number": self.check_count,
"timestamp": datetime.now().isoformat(),
"total_checks": len(results),
"failures": len(failures),
"error_rate": error_rate,
"p95_ms": p95,
"results": results,
}
def watch(self, interval_seconds: int = 60):
"""Surveillance continue avec alertes Telegram."""
print(f"🔍 Beta monitoring démarré — {self.base_url}")
print(f" Intervalle: {interval_seconds}s")
print(f" Endpoints: {len(self.ENDPOINTS_TO_CHECK)}")
print(f" Ctrl+C pour arrêter\n")
consecutive_errors = 0
try:
while True:
summary = self.run_checks()
timestamp = datetime.now().strftime("%H:%M:%S")
status_icon = "" if summary["error_rate"] == 0 else ""
print(
f"[{timestamp}] {status_icon} "
f"Check #{summary['check_number']}"
f"p95={summary['p95_ms']:.0f}ms, "
f"errors={summary['failures']}/{summary['total_checks']}"
)
# Alertes
if summary["error_rate"] > ERROR_RATE_THRESHOLD:
consecutive_errors += 1
if consecutive_errors >= 2: # 2 checks consécutifs en erreur
for failure in summary["results"]:
if not failure["ok"]:
alert_error(
failure["path"],
failure.get("status", 0),
failure.get("error", "Non-2xx response"),
)
else:
consecutive_errors = 0
if summary["p95_ms"] > LATENCY_P95_THRESHOLD_MS:
print(f"⚠️ Latence p95 élevée: {summary['p95_ms']:.0f}ms")
if summary["p95_ms"] > LATENCY_P95_THRESHOLD_MS * 2:
alert_performance(summary["p95_ms"], summary["error_rate"])
# Sauvegarder les résultats
log_file = REPORTS_DIR / "beta_monitor_log.jsonl"
with open(log_file, "a") as f:
f.write(json.dumps(summary) + "\n")
time.sleep(interval_seconds)
except KeyboardInterrupt:
print(f"\n⏹️ Monitoring arrêté après {self.check_count} checks")
self.generate_report()
# ============================================================
# Rapport beta final
# ============================================================
class BetaReport:
"""Générateur de rapport beta fermée."""
def __init__(self, base_url: str = BASE_URL):
self.base_url = base_url
self.timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
def collect_feedback_from_db(self) -> list[dict]:
"""Collecte les feedbacks depuis la BDD (table beta_feedback si elle existe)."""
try:
conn = sqlite3.connect(BETA_DB_PATH)
c = conn.cursor()
c.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='beta_feedback'"
)
if not c.fetchone():
conn.close()
return []
c.execute("SELECT * FROM beta_feedback ORDER BY created_at DESC")
rows = c.fetchall()
conn.close()
return [dict(zip([col[0] for col in c.description], row)) for row in rows]
except Exception as e:
print(f"⚠️ Impossible de lire beta_feedback: {e}")
return []
def collect_monitor_logs(self) -> list[dict]:
"""Lit les logs du monitoring beta."""
log_file = REPORTS_DIR / "beta_monitor_log.jsonl"
if not log_file.exists():
return []
entries = []
with open(log_file) as f:
for line in f:
try:
entries.append(json.loads(line))
except Exception:
pass
return entries
def generate(self) -> str:
"""Génère le rapport complet et le sauvegarde."""
feedbacks = self.collect_feedback_from_db()
monitor_logs = self.collect_monitor_logs()
# Calculer NPS depuis les feedbacks
nps_scores = [
f.get("nps_score") for f in feedbacks if f.get("nps_score") is not None
]
avg_nps = sum(nps_scores) / len(nps_scores) if nps_scores else None
# Statistiques monitoring
if monitor_logs:
all_latencies = []
total_errors = 0
total_checks = 0
for entry in monitor_logs:
all_latencies.extend(
[
r["latency_ms"]
for r in entry.get("results", [])
if r.get("latency_ms", 0) > 0
]
)
total_errors += entry.get("failures", 0)
total_checks += entry.get("total_checks", 0)
avg_latency = (
sum(all_latencies) / len(all_latencies) if all_latencies else 0
)
overall_error_rate = total_errors / total_checks if total_checks > 0 else 0
else:
avg_latency = 0
overall_error_rate = 0
total_checks = 0
# Construire le rapport
report = []
report.append("=" * 60)
report.append("RAPPORT BETA FERMÉE — SaaS Turf Prédictions IA")
report.append(f"Généré le : {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
report.append(f"Ticket : HRT-34")
report.append("=" * 60)
report.append("")
report.append("## 1. PARTICIPANTS BETA")
report.append(f" Feedbacks reçus : {len(feedbacks)}")
report.append(
f" NPS moyen : {avg_nps:.1f}/10"
if avg_nps
else " NPS moyen : (en attente feedbacks)"
)
report.append(f" Cible NPS : ≥ {NPS_TARGET}/10")
nps_ok = avg_nps is not None and avg_nps >= NPS_TARGET
report.append(
f" Statut NPS : {'✅ OBJECTIF ATTEINT' if nps_ok else '⏳ En attente' if avg_nps is None else '❌ OBJECTIF NON ATTEINT'}"
)
report.append("")
report.append("## 2. BUGS SIGNALÉS")
bugs = [f for f in feedbacks if f.get("type") == "bug"]
critical_bugs = [b for b in bugs if b.get("severity") in ("critical", "high")]
report.append(f" Total bugs : {len(bugs)}")
report.append(f" Critiques/High : {len(critical_bugs)}")
report.append(
f" Statut : {'✅ 0 bug critique' if len(critical_bugs) == 0 else f'{len(critical_bugs)} bug(s) critique(s)'}"
)
report.append("")
report.append("## 3. PERFORMANCE RÉELLE (monitoring)")
report.append(f" Checks effectués: {total_checks}")
report.append(f" Latence moyenne : {avg_latency:.1f}ms")
report.append(f" Error rate : {overall_error_rate * 100:.2f}%")
report.append(f" Seuil latence : {LATENCY_P95_THRESHOLD_MS}ms")
perf_ok = (
avg_latency < LATENCY_P95_THRESHOLD_MS
and overall_error_rate < ERROR_RATE_THRESHOLD
)
report.append(
f" Statut : {'✅ OBJECTIF ATTEINT' if perf_ok else '⏳ Données insuffisantes' if total_checks == 0 else '❌ OBJECTIF NON ATTEINT'}"
)
report.append("")
report.append("## 4. FEEDBACKS UX")
ux_feedbacks = [f for f in feedbacks if f.get("type") == "ux"]
report.append(f" Retours UX : {len(ux_feedbacks)}")
if ux_feedbacks:
for fb in ux_feedbacks[:5]: # Top 5
report.append(f" - {fb.get('comment', '')[:100]}")
report.append("")
report.append("## 5. VERDICT BETA FERMÉE")
users_ok = len(feedbacks) >= 5 # Au moins 5 feedbacks = 5 users satisfaits
verdict = all([users_ok, nps_ok, len(critical_bugs) == 0])
report.append(
f" Participants suffisants (≥5) : {'' if users_ok else ''}"
)
report.append(f" NPS ≥ 7/10 : {'' if nps_ok else ''}")
report.append(
f" 0 bug critique : {'' if len(critical_bugs) == 0 else ''}"
)
report.append("")
report.append(
f" VERDICT GLOBAL : {'✅ GO — Beta réussie' if verdict else '❌ NO-GO — Conditions non remplies'}"
)
report.append("=" * 60)
report_text = "\n".join(report)
# Sauvegarder
report_file = REPORTS_DIR / f"beta_report_{self.timestamp}.txt"
with open(report_file, "w") as f:
f.write(report_text)
print(report_text)
print(f"\nRapport sauvegardé : {report_file}")
return report_text
# ============================================================
# CLI
# ============================================================
def main():
parser = argparse.ArgumentParser(description="Beta Monitor — SaaS Turf IA")
parser.add_argument("--watch", action="store_true", help="Surveillance continue")
parser.add_argument(
"--interval", type=int, default=60, help="Intervalle en secondes (défaut: 60)"
)
parser.add_argument(
"--report", action="store_true", help="Générer le rapport beta final"
)
parser.add_argument(
"--test-telegram", action="store_true", help="Tester l'envoi Telegram"
)
parser.add_argument(
"--url", default=BASE_URL, help=f"URL de l'app (défaut: {BASE_URL})"
)
args = parser.parse_args()
if args.test_telegram:
print("Test d'envoi Telegram...")
ok = send_telegram(
"✅ *Test alerte Beta* — SaaS Turf IA\n_Ceci est un test du système d'alertes QA_\nTicket: HRT-34"
)
sys.exit(0 if ok else 1)
if args.report:
reporter = BetaReport(args.url)
reporter.generate()
sys.exit(0)
if args.watch:
monitor = BetaMonitor(args.url)
monitor.watch(interval_seconds=args.interval)
sys.exit(0)
parser.print_help()
if __name__ == "__main__":
main()

124
tests/conftest.py Normal file
View File

@@ -0,0 +1,124 @@
"""
conftest.py — Configuration pytest globale
SaaS Turf Prédictions IA — Sprint 8 QA
Ticket: HRT-34
"""
import os
import asyncio
import pytest
from pathlib import Path
from datetime import datetime
# ============================================================
# Répertoires de sortie
# ============================================================
REPORTS_DIR = Path("tests/reports")
SCREENSHOTS_DIR = Path("tests/screenshots")
for d in [REPORTS_DIR, SCREENSHOTS_DIR]:
d.mkdir(parents=True, exist_ok=True)
# ============================================================
# Variables d'environnement
# ============================================================
BASE_URL = os.environ.get("APP_URL", "http://localhost:8792")
# ============================================================
# Fixtures globales
# ============================================================
@pytest.fixture(scope="session")
def base_url():
return BASE_URL
@pytest.fixture(scope="session")
def event_loop():
"""Event loop partagé pour les tests async de la session."""
policy = asyncio.get_event_loop_policy()
loop = policy.new_event_loop()
yield loop
loop.close()
@pytest.fixture(scope="session")
def reports_dir():
return REPORTS_DIR
@pytest.fixture(scope="session")
def screenshots_dir():
return SCREENSHOTS_DIR
# ============================================================
# Hook : screenshot automatique sur échec
# ============================================================
@pytest.hookimpl(tryfirst=True, hookwrapper=True)
def pytest_runtest_makereport(item, call):
"""Capture screenshot automatiquement sur tout test E2E en échec."""
outcome = yield
report = outcome.get_result()
if report.when == "call" and report.failed:
# Récupérer la page Playwright si disponible dans les fixtures
page = None
for fixture_name in ("page", "context_page"):
if fixture_name in item.funcargs:
val = item.funcargs[fixture_name]
if isinstance(val, tuple):
page = val[0] # (page, browser_name)
else:
page = val
break
if page is not None:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
test_name = item.name.replace("/", "_").replace(":", "_")
screenshot_path = SCREENSHOTS_DIR / f"FAIL_{test_name}_{timestamp}.png"
try:
# Playwright page.screenshot est synchrone dans les fixtures sync
# Pour les fixtures async, on force la capture
import asyncio as _asyncio
if _asyncio.iscoroutinefunction(page.screenshot):
loop = _asyncio.get_event_loop()
loop.run_until_complete(page.screenshot(path=str(screenshot_path)))
else:
page.screenshot(path=str(screenshot_path))
report.sections.append(
("Screenshot", f"Sauvegardé : {screenshot_path}")
)
except Exception as e:
report.sections.append(
("Screenshot Error", f"Impossible de capturer : {e}")
)
# ============================================================
# Marqueurs personnalisés
# ============================================================
def pytest_configure(config):
config.addinivalue_line("markers", "e2e: Tests End-to-End Playwright")
config.addinivalue_line("markers", "load: Tests de charge Locust")
config.addinivalue_line("markers", "security: Tests de sécurité")
config.addinivalue_line(
"markers", "smoke: Tests rapides de smoke (sans infra complète)"
)
config.addinivalue_line("markers", "beta: Tests spécifiques beta fermée")
config.addinivalue_line(
"markers", "requires_billing: Nécessite HRT-31 (Billing Stripe)"
)
config.addinivalue_line(
"markers", "requires_infra: Nécessite HRT-33 (infra staging)"
)

333
tests/test_ml_ensemble.py Normal file
View File

@@ -0,0 +1,333 @@
"""
Tests ML Ensemble — HRT-32 Sprint 6-7
Tests de régression, benchmark et latence pour le nouveau modèle ensemble.
Usage:
pytest tests/test_ml_ensemble.py -v
pytest tests/test_ml_ensemble.py -v -m regression
pytest tests/test_ml_ensemble.py -v -m latency
"""
import json
import os
import pickle
import sqlite3
import time
from pathlib import Path
import numpy as np
import pandas as pd
import pytest
import requests
BASE_URL = os.environ.get("APP_URL", "http://localhost:8790")
DB_PATH = os.environ.get("DB_PATH", "/home/h3r7/turf_saas/turf.db")
MODELS_DIR = Path("/home/h3r7/turf_saas/models")
ENSEMBLE_PATH = MODELS_DIR / "ensemble_top3.pkl"
BENCHMARK_PATH = MODELS_DIR / "benchmark_report.json"
# ─── Fixtures ────────────────────────────────────────────────────────────────
@pytest.fixture(scope="session")
def ensemble_model():
"""Load ensemble model (skip tests if not yet trained)."""
if not ENSEMBLE_PATH.exists():
pytest.skip(
f"Ensemble model not found at {ENSEMBLE_PATH}. Run train_ensemble.py first."
)
with open(ENSEMBLE_PATH, "rb") as f:
return pickle.load(f)
@pytest.fixture(scope="session")
def benchmark_report():
"""Load benchmark report (skip if not generated)."""
if not BENCHMARK_PATH.exists():
pytest.skip(f"Benchmark report not found at {BENCHMARK_PATH}.")
with open(BENCHMARK_PATH) as f:
return json.load(f)
@pytest.fixture(scope="session")
def holdout_data():
"""Load holdout slice (last 20% temporal) for regression tests."""
conn = sqlite3.connect(DB_PATH)
df = pd.read_sql_query(
"""
SELECT p.*, c.distance, c.discipline, c.specialite,
c.nb_declares_partants, c.montant_prix, c.penetrometre_intitule
FROM pmu_partants p
LEFT JOIN pmu_courses c ON p.date_programme=c.date_programme
AND p.num_reunion=c.num_reunion AND p.num_course=c.num_course
WHERE p.ordre_arrivee > 0
ORDER BY p.date_programme, p.num_reunion, p.num_course, p.num_pmu
""",
conn,
)
conn.close()
n = len(df)
cutoff = int(n * 0.80)
return df.iloc[cutoff:].copy()
@pytest.fixture(scope="session")
def predict_v2():
"""Import predict_v2 module."""
import importlib.util
spec = importlib.util.spec_from_file_location(
"predict_v2", "/home/h3r7/turf_saas/predict_v2.py"
)
mod = importlib.util.module_from_spec(spec)
spec.loader.exec_module(mod)
return mod
# ─── Model Existence Tests ────────────────────────────────────────────────────
class TestModelFiles:
"""Verify all expected model files exist."""
def test_ensemble_model_exists(self):
assert ENSEMBLE_PATH.exists(), f"Ensemble model missing: {ENSEMBLE_PATH}"
def test_benchmark_report_exists(self):
assert BENCHMARK_PATH.exists(), f"Benchmark report missing: {BENCHMARK_PATH}"
def test_models_dir_contains_expected_files(self):
expected = ["ensemble_top3.pkl", "benchmark_report.json", "benchmark_report.md"]
for fname in expected:
assert (MODELS_DIR / fname).exists(), f"Missing: {MODELS_DIR / fname}"
# ─── Benchmark Tests ──────────────────────────────────────────────────────────
class TestBenchmark:
"""Validate benchmark metrics from the training report."""
@pytest.mark.regression
def test_ensemble_beats_baseline_or_meets_threshold(self, benchmark_report):
"""Ensemble Precision@3 must be >= baseline XGBoost."""
baseline = benchmark_report["baseline"]["precision_at3"]
ensemble = benchmark_report["ensemble"]["precision_at3"]
assert ensemble >= baseline, (
f"Ensemble Precision@3 {ensemble:.4f} < baseline {baseline:.4f}"
)
@pytest.mark.regression
def test_ensemble_auc_above_random(self, benchmark_report):
"""Ensemble AUC must be > 0.60 (significantly above random 0.50)."""
auc = benchmark_report["ensemble"]["auc"]
assert auc > 0.60, f"Ensemble AUC {auc:.4f} <= 0.60"
@pytest.mark.regression
def test_optuna_ran_minimum_trials(self, benchmark_report):
"""Optuna must have run at least 100 trials per model."""
n_trials = benchmark_report["optuna"]["n_trials"]
assert n_trials >= 100, f"Only {n_trials} Optuna trials (minimum 100 required)"
@pytest.mark.regression
def test_no_precision_regression(self, benchmark_report):
"""Ensemble Precision@3 must not be below naive random baseline (~30%)."""
ensemble_p3 = benchmark_report["ensemble"]["precision_at3"]
assert ensemble_p3 >= 0.30, (
f"Precision@3 {ensemble_p3:.4f} is below random baseline (~0.30)"
)
def test_benchmark_has_all_required_models(self, benchmark_report):
"""Benchmark must include results for all 3 models."""
required = {"xgboost", "lightgbm", "mlp"}
found = set(benchmark_report.get("individual_models", {}).keys())
missing = required - found
assert not missing, f"Missing model benchmarks: {missing}"
# ─── Regression Tests ─────────────────────────────────────────────────────────
class TestPrecisionRegression:
"""Holdout regression: ensure precision doesn't degrade."""
@pytest.mark.regression
def test_precision_at3_on_holdout(self, ensemble_model, holdout_data):
"""Precision@3 on holdout must be above naive baseline."""
from predict_v2 import build_feature_df, FEATURE_COLS
df = holdout_data.copy()
df["top3"] = (df["ordre_arrivee"] <= 3).astype(int)
partants = df.to_dict("records")
feature_df = build_feature_df(partants)
available = [c for c in FEATURE_COLS if c in feature_df.columns]
X = feature_df[available].fillna(0)
proba = ensemble_model.predict_proba(X)[:, 1]
# Per-race Precision@3
tmp = df[["date_programme", "num_reunion", "num_course"]].copy()
tmp["proba"] = proba
tmp["actual"] = df["top3"].values
precisions = []
for _, group in tmp.groupby(["date_programme", "num_reunion", "num_course"]):
if len(group) >= 3:
top3_pred = group.nlargest(3, "proba")
precisions.append(top3_pred["actual"].sum() / 3.0)
p_at3 = float(np.mean(precisions)) if precisions else 0.0
print(f"\n Holdout Precision@3: {p_at3:.4f} over {len(precisions)} races")
# Must beat random baseline (30%)
assert p_at3 >= 0.30, f"Holdout Precision@3 {p_at3:.4f} < 0.30"
@pytest.mark.regression
def test_no_all_zero_predictions(self, ensemble_model, holdout_data):
"""Ensemble must not predict 0 probability for all horses."""
from predict_v2 import build_feature_df, FEATURE_COLS
partants = holdout_data.head(50).to_dict("records")
feature_df = build_feature_df(partants)
available = [c for c in FEATURE_COLS if c in feature_df.columns]
X = feature_df[available].fillna(0)
proba = ensemble_model.predict_proba(X)[:, 1]
assert proba.max() > 0.01, "All predictions are near 0 — model appears broken"
assert proba.std() > 0.01, (
"All predictions have identical probability — no discrimination"
)
# ─── Latency Tests ────────────────────────────────────────────────────────────
class TestPredictionLatency:
"""Prediction latency must be < 200ms per race."""
@pytest.mark.latency
def test_single_race_latency(self, ensemble_model, holdout_data):
"""Prediction for a single race (<=20 horses) must be < 200ms."""
from predict_v2 import build_feature_df, FEATURE_COLS
# Take one race
first_race = (
holdout_data.groupby(["date_programme", "num_reunion", "num_course"])
.first()
.reset_index()
.iloc[0]
)
mask = (
(holdout_data["date_programme"] == first_race["date_programme"])
& (holdout_data["num_reunion"] == first_race["num_reunion"])
& (holdout_data["num_course"] == first_race["num_course"])
)
race_df = holdout_data[mask]
partants = race_df.to_dict("records")
# Warm-up
feature_df = build_feature_df(partants)
available = [c for c in FEATURE_COLS if c in feature_df.columns]
X = feature_df[available].fillna(0)
ensemble_model.predict_proba(X)
# Timed run
t0 = time.perf_counter()
for _ in range(10):
ensemble_model.predict_proba(X)
elapsed_ms = (time.perf_counter() - t0) / 10 * 1000
print(f"\n Single-race latency: {elapsed_ms:.2f} ms ({len(partants)} horses)")
assert elapsed_ms < 200, (
f"Prediction latency {elapsed_ms:.1f} ms exceeds 200 ms limit"
)
@pytest.mark.latency
def test_full_day_latency(self, ensemble_model, holdout_data):
"""Prediction for a full day (all races) must complete < 5 seconds."""
from predict_v2 import build_feature_df, FEATURE_COLS
# Take one day
day = holdout_data["date_programme"].iloc[0]
day_df = holdout_data[holdout_data["date_programme"] == day]
partants = day_df.to_dict("records")
feature_df = build_feature_df(partants)
available = [c for c in FEATURE_COLS if c in feature_df.columns]
X = feature_df[available].fillna(0)
t0 = time.perf_counter()
proba = ensemble_model.predict_proba(X)
elapsed_ms = (time.perf_counter() - t0) * 1000
print(
f"\n Full day latency: {elapsed_ms:.2f} ms ({len(partants)} horses, {day})"
)
assert elapsed_ms < 5000, (
f"Full-day prediction {elapsed_ms:.0f} ms exceeds 5s limit"
)
# ─── API Endpoint Tests ───────────────────────────────────────────────────────
class TestV1PredictionsAPI:
"""Tests for the new /api/v1/predictions endpoint."""
def _api_available(self):
try:
requests.get(f"{BASE_URL}/api/v1/model/status", timeout=3)
return True
except Exception:
return False
@pytest.mark.api
def test_model_status_endpoint(self):
"""GET /api/v1/model/status returns valid JSON."""
if not self._api_available():
pytest.skip("API server not running")
resp = requests.get(f"{BASE_URL}/api/v1/model/status", timeout=10)
assert resp.status_code == 200
data = resp.json()
assert "ensemble_available" in data
@pytest.mark.api
def test_v1_predictions_no_500(self):
"""GET /api/v1/predictions must not return 5xx."""
if not self._api_available():
pytest.skip("API server not running")
resp = requests.get(f"{BASE_URL}/api/v1/predictions", timeout=30)
assert resp.status_code < 500, (
f"Server error: {resp.status_code}\n{resp.text[:200]}"
)
@pytest.mark.api
def test_v1_predictions_returns_json(self):
"""GET /api/v1/predictions returns valid JSON with expected keys."""
if not self._api_available():
pytest.skip("API server not running")
resp = requests.get(f"{BASE_URL}/api/v1/predictions", timeout=30)
if resp.status_code == 503:
pytest.skip("Ensemble model not yet deployed")
assert resp.status_code == 200
data = resp.json()
assert "model_version" in data, "Missing model_version in response"
assert "races" in data or "predictions" in data, (
"Missing races/predictions in response"
)
@pytest.mark.api
def test_v1_predictions_latency(self):
"""GET /api/v1/predictions must respond in < 3 seconds."""
if not self._api_available():
pytest.skip("API server not running")
resp = requests.get(f"{BASE_URL}/api/v1/predictions", timeout=30)
if resp.status_code == 503:
pytest.skip("Ensemble model not yet deployed")
# Check API-reported latency
if resp.status_code == 200:
data = resp.json()
latency = data.get("latency_ms", 0)
assert latency < 3000, f"API latency {latency:.0f} ms > 3000 ms"

205
tests/test_smoke.py Normal file
View File

@@ -0,0 +1,205 @@
"""
Tests de smoke — SaaS Turf Prédictions IA
Sprint 8 — QA, Beta Fermee, Go/No-Go
Ticket: HRT-34
Vérifications rapides sur l'état de l'application :
- Routes de base accessibles
- API répond en JSON valide
- Base de données accessible
- Pas d'erreurs 5xx sur les routes principales
Ces tests peuvent tourner SANS infra complète (pas besoin de HRT-31/33).
Exécuter sur l'app actuelle en staging ou localhost.
"""
import pytest
import requests
import os
import json
BASE_URL = os.environ.get("APP_URL", "http://localhost:8792")
# Routes qui doivent retourner 200 (publiques)
PUBLIC_ROUTES_200 = [
"/",
"/dashboard",
]
# Routes API qui doivent retourner 200 ou 401 (jamais 500)
API_ROUTES_NO_500 = [
"/api",
"/api/races",
"/api/scoring",
"/api/weather",
"/api/odds_history",
]
class TestSmoke:
"""Tests de smoke : l'app répond correctement aux requêtes de base."""
@pytest.mark.smoke
@pytest.mark.parametrize("route", PUBLIC_ROUTES_200)
def test_route_publique_accessible(self, route):
"""Les routes publiques doivent retourner 200."""
try:
resp = requests.get(f"{BASE_URL}{route}", timeout=10)
assert resp.status_code in (200, 304), (
f"Route publique inaccessible: {route}{resp.status_code}"
)
assert len(resp.content) > 0, f"Réponse vide sur {route}"
except requests.exceptions.ConnectionError:
pytest.skip(
f"App non accessible sur {BASE_URL} — vérifier que le serveur est démarré"
)
@pytest.mark.smoke
@pytest.mark.parametrize("route", API_ROUTES_NO_500)
def test_api_pas_derreur_serveur(self, route):
"""Les routes API ne doivent jamais retourner 5xx."""
try:
resp = requests.get(f"{BASE_URL}{route}", timeout=10)
assert resp.status_code < 500, (
f"Erreur serveur sur {route}: {resp.status_code}\n{resp.text[:200]}"
)
except requests.exceptions.ConnectionError:
pytest.skip(f"App non accessible sur {BASE_URL}")
@pytest.mark.smoke
def test_api_today_retourne_json(self):
"""L'endpoint principal /api doit retourner du JSON valide."""
try:
resp = requests.get(f"{BASE_URL}/api", timeout=10)
if resp.status_code == 200:
data = resp.json()
assert data is not None, "Réponse JSON nulle"
assert isinstance(data, (list, dict)), (
f"Type de réponse inattendu: {type(data)}"
)
except requests.exceptions.ConnectionError:
pytest.skip(f"App non accessible sur {BASE_URL}")
except json.JSONDecodeError as e:
pytest.fail(f"/api ne retourne pas du JSON valide: {e}")
@pytest.mark.smoke
def test_contenu_html_portail_valide(self):
"""Le portail doit contenir un titre et du contenu significatif."""
try:
resp = requests.get(f"{BASE_URL}/", timeout=10)
if resp.status_code == 200:
content = resp.text
assert "<html" in content.lower() or "<!doctype" in content.lower(), (
"La page d'accueil ne retourne pas du HTML"
)
assert len(content) > 500, (
f"Page d'accueil trop courte ({len(content)} chars)"
)
except requests.exceptions.ConnectionError:
pytest.skip(f"App non accessible sur {BASE_URL}")
@pytest.mark.smoke
def test_headers_securite_presents(self):
"""Les headers de sécurité de base doivent être présents."""
try:
resp = requests.get(f"{BASE_URL}/", timeout=10)
if resp.status_code != 200:
return
# En production (derrière Nginx), ces headers doivent être présents
# En dev direct Flask, ils peuvent être absents — on note seulement
security_headers = {
"X-Content-Type-Options": "nosniff",
"X-Frame-Options": None, # SAMEORIGIN ou DENY
"X-XSS-Protection": None,
}
missing = []
for header, expected_value in security_headers.items():
if header not in resp.headers:
missing.append(header)
if missing:
# Warning seulement — bloquant uniquement en prod derrière Nginx
pytest.warns(UserWarning, match=r".*") if False else None
print(f"⚠️ Headers sécurité manquants (requis en prod): {missing}")
except requests.exceptions.ConnectionError:
pytest.skip(f"App non accessible sur {BASE_URL}")
@pytest.mark.smoke
def test_api_races_format_reponse(self):
"""L'endpoint /api/races doit retourner une liste structurée."""
try:
resp = requests.get(f"{BASE_URL}/api/races", timeout=10)
if resp.status_code == 200:
data = resp.json()
assert isinstance(data, (list, dict)), (
f"Format inattendu pour /api/races: {type(data)}"
)
if isinstance(data, list) and len(data) > 0:
first = data[0]
# Vérifier la présence de champs clés
expected_fields = ["date", "course", "hippodrome"]
present = [
f
for f in expected_fields
if f in first
or any(k in first for k in [f, f.upper(), f.replace("_", "")])
]
assert len(present) > 0, (
f"Champs attendus absents de /api/races. Champs présents: {list(first.keys())}"
)
except requests.exceptions.ConnectionError:
pytest.skip(f"App non accessible sur {BASE_URL}")
except json.JSONDecodeError:
pytest.fail("/api/races ne retourne pas du JSON valide")
class TestSmokeDatabase:
"""Tests smoke sur la base de données."""
@pytest.mark.smoke
def test_base_donnees_accessible(self):
"""La base de données SQLite doit être accessible et contenir des données."""
import sqlite3
db_path = "/home/h3r7/turf_saas/turf_saas.db"
if not __import__("os").path.exists(db_path):
pytest.skip(f"Base de données non trouvée: {db_path}")
conn = sqlite3.connect(db_path)
c = conn.cursor()
# Vérifier que les tables essentielles existent
c.execute("SELECT name FROM sqlite_master WHERE type='table'")
tables = {row[0] for row in c.fetchall()}
conn.close()
expected_tables = ["predictions", "results"]
for table in expected_tables:
assert table in tables, (
f"Table manquante dans la BDD: {table}. Tables présentes: {tables}"
)
@pytest.mark.smoke
def test_donnees_predictions_disponibles(self):
"""Des prédictions doivent être présentes dans la BDD."""
import sqlite3
db_path = "/home/h3r7/turf_saas/turf_saas.db"
if not __import__("os").path.exists(db_path):
pytest.skip(f"Base de données non trouvée: {db_path}")
conn = sqlite3.connect(db_path)
c = conn.cursor()
c.execute("SELECT COUNT(*) FROM predictions")
count = c.fetchone()[0]
conn.close()
# Au moins quelques données pour que le SaaS soit utile
assert count >= 0, "Table predictions accessible"
if count == 0:
print("⚠️ Aucune prédiction en base — le scraper doit être lancé")