#!/usr/bin/env python3 """ ml_feedback_saas.py — Feedback loop ML pour turf_saas. Enregistre les paris virtuels XGBoost depuis ml_predictions_cache et met à jour les résultats/dividendes depuis pmu_partants + pmu_rapports. DB cible : /home/h3r7/turf_saas/turf_saas.db Stratégies : A) xgboost_sg : simple_gagnant — top1 ML par course, ml_score >= 70, mise 1€ B) xgboost_value : simple_gagnant — is_value_bet = 1, mise 1€ C) xgboost_sp : simple_place — top1 ML par course, ml_score >= 50, mise 1€ D) xgboost_2sur4 : deux_sur_quatre — top4 ML par course, 6 combos x 1€ = mise 6€ Usage : python3 ml_feedback_saas.py # Traite aujourd'hui python3 ml_feedback_saas.py --backfill 2026-04-25 python3 ml_feedback_saas.py --date 2026-04-25 """ import sqlite3 import sys import logging import os from datetime import datetime, timedelta DB_PATH = "/home/h3r7/turf_saas/turf_saas.db" os.makedirs("/home/h3r7/turf_saas/logs", exist_ok=True) logging.basicConfig( level=logging.INFO, format="%(asctime)s [ml_feedback_saas] %(levelname)s %(message)s", handlers=[ logging.FileHandler("/home/h3r7/turf_saas/logs/ml_feedback_saas.log"), logging.StreamHandler(), ], ) log = logging.getLogger(__name__) # ───────────────────────────────────────────────────────── # UTILITAIRES # ───────────────────────────────────────────────────────── def get_db(): conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row return conn def pari_existe(cursor, date, num_reunion, num_course, numero1, type_pari, source_reco): """Vérifie si un pari identique existe déjà (idempotence).""" cursor.execute( """ SELECT id FROM paris WHERE date_course = ? AND source_reco = ? AND type_pari = ? AND numero1 = ? AND race_label = ? """, (date, source_reco, type_pari, numero1, f"R{num_reunion}C{num_course}"), ) return cursor.fetchone() is not None def pari_2sur4_existe(cursor, date, num_reunion, num_course, source_reco): """Vérifie si un pari 2sur4 existe déjà pour cette course.""" cursor.execute( """ SELECT id FROM paris WHERE date_course = ? AND source_reco = ? AND race_label = ? """, (date, source_reco, f"R{num_reunion}C{num_course}"), ) return cursor.fetchone() is not None def get_top_ml_par_course(cursor, date, n=4, min_score=0): """Retourne les n meilleurs chevaux ML par course pour une date.""" cursor.execute( """ SELECT num_reunion, num_course, horse_name, horse_number, ml_score, odds, recommendation, is_value_bet, race_label, race_name, hippodrome, heure, discipline, distance FROM ml_predictions_cache WHERE date = ? AND ml_score >= ? ORDER BY num_reunion, num_course, ml_score DESC """, (date, min_score), ) rows = cursor.fetchall() courses = {} for r in rows: key = (r["num_reunion"], r["num_course"]) if key not in courses: courses[key] = [] if len(courses[key]) < n: courses[key].append(dict(r)) return courses # ───────────────────────────────────────────────────────── # STRATÉGIE A — Simple Gagnant top1 ML (score >= 70) # ───────────────────────────────────────────────────────── def save_ml_paris_sg(conn, date): """Insère 1 pari simple_gagnant par course : top1 ML avec ml_score >= 70.""" cursor = conn.cursor() courses = get_top_ml_par_course(cursor, date, n=1, min_score=70) inseres = 0 for (num_reunion, num_course), chevaux in courses.items(): cheval = chevaux[0] if pari_existe( cursor, date, num_reunion, num_course, cheval["horse_number"], "simple_gagnant", "xgboost_sg", ): continue cursor.execute( """ INSERT INTO paris (date_pari, date_course, race_name, race_label, hippodrome, type_pari, chevaux, cheval1, numero1, cote, mise, statut, gain, source_reco, model_source) VALUES (?, ?, ?, ?, ?, 'simple_gagnant', ?, ?, ?, ?, 1.0, 'EN_ATTENTE', 0.0, 'xgboost_sg', 'xgboost_v1') """, ( date, date, cheval.get("race_name") or "", f"R{num_reunion}C{num_course}", cheval.get("hippodrome") or "", cheval["horse_name"], cheval["horse_name"], cheval["horse_number"], cheval["odds"], ), ) inseres += 1 conn.commit() log.info(f"[SG] {date} → {inseres} paris simple_gagnant insérés (score>=70)") return inseres # ───────────────────────────────────────────────────────── # STRATÉGIE B — Value Bet (is_value_bet = 1) # ───────────────────────────────────────────────────────── def save_ml_paris_value(conn, date): """Insère 1 pari simple_gagnant pour chaque cheval is_value_bet=1.""" cursor = conn.cursor() cursor.execute( """ SELECT num_reunion, num_course, horse_name, horse_number, ml_score, odds, race_label, race_name, hippodrome FROM ml_predictions_cache WHERE date = ? AND is_value_bet = 1 ORDER BY num_reunion, num_course, ml_score DESC """, (date,), ) rows = [dict(r) for r in cursor.fetchall()] inseres = 0 for r in rows: if pari_existe( cursor, date, r["num_reunion"], r["num_course"], r["horse_number"], "simple_gagnant", "xgboost_value", ): continue cursor.execute( """ INSERT INTO paris (date_pari, date_course, race_name, race_label, hippodrome, type_pari, chevaux, cheval1, numero1, cote, mise, statut, gain, source_reco, model_source) VALUES (?, ?, ?, ?, ?, 'simple_gagnant', ?, ?, ?, ?, 1.0, 'EN_ATTENTE', 0.0, 'xgboost_value', 'xgboost_v1') """, ( date, date, r.get("race_name") or "", r.get("race_label") or f"R{r['num_reunion']}C{r['num_course']}", r.get("hippodrome") or "", r["horse_name"], r["horse_name"], r["horse_number"], r["odds"], ), ) inseres += 1 conn.commit() log.info(f"[VALUE] {date} → {inseres} paris value_bet insérés") return inseres # ───────────────────────────────────────────────────────── # STRATÉGIE C — Simple Placé top1 ML (score >= 50) # ───────────────────────────────────────────────────────── def save_ml_paris_sp(conn, date): """Insère 1 pari simple_place par course : top1 ML avec ml_score >= 50.""" cursor = conn.cursor() courses = get_top_ml_par_course(cursor, date, n=1, min_score=50) inseres = 0 for (num_reunion, num_course), chevaux in courses.items(): cheval = chevaux[0] if pari_existe( cursor, date, num_reunion, num_course, cheval["horse_number"], "simple_place", "xgboost_sp", ): continue cursor.execute( """ INSERT INTO paris (date_pari, date_course, race_name, race_label, hippodrome, type_pari, chevaux, cheval1, numero1, cote, mise, statut, gain, source_reco, model_source) VALUES (?, ?, ?, ?, ?, 'simple_place', ?, ?, ?, ?, 1.0, 'EN_ATTENTE', 0.0, 'xgboost_sp', 'xgboost_v1') """, ( date, date, cheval.get("race_name") or "", f"R{num_reunion}C{num_course}", cheval.get("hippodrome") or "", cheval["horse_name"], cheval["horse_name"], cheval["horse_number"], cheval["odds"], ), ) inseres += 1 conn.commit() log.info(f"[SP] {date} → {inseres} paris simple_place insérés (score>=50)") return inseres # ───────────────────────────────────────────────────────── # STRATÉGIE D — 2sur4 top4 ML (6 combinaisons x 1€) # ───────────────────────────────────────────────────────── def save_ml_paris_2sur4(conn, date): """Insère 1 pari deux_sur_quatre par course : top4 ML, mise 6€.""" cursor = conn.cursor() courses = get_top_ml_par_course(cursor, date, n=4, min_score=0) inseres = 0 for (num_reunion, num_course), chevaux in courses.items(): if len(chevaux) < 4: continue if pari_2sur4_existe(cursor, date, num_reunion, num_course, "xgboost_2sur4"): continue top4 = chevaux[:4] nums = [str(c["horse_number"]) for c in top4] noms = [c["horse_name"] for c in top4] chevaux_str = "/".join(noms) cursor.execute( """ INSERT INTO paris (date_pari, date_course, race_name, race_label, hippodrome, type_pari, chevaux, cheval1, numero1, cote, mise, statut, gain, source_reco, model_source, commentaire) VALUES (?, ?, ?, ?, ?, 'deux_sur_quatre', ?, ?, ?, 0.0, 6.0, 'EN_ATTENTE', 0.0, 'xgboost_2sur4', 'xgboost_v1', ?) """, ( date, date, top4[0].get("race_name") or "", f"R{num_reunion}C{num_course}", top4[0].get("hippodrome") or "", chevaux_str, top4[0]["horse_name"], top4[0]["horse_number"], f"top4 ML: {'/'.join(nums)}", ), ) inseres += 1 conn.commit() log.info(f"[2S4] {date} → {inseres} paris deux_sur_quatre insérés") return inseres # ───────────────────────────────────────────────────────── # UPDATE RÉSULTATS + DIVIDENDES # ───────────────────────────────────────────────────────── def update_ml_paris_results(conn, date): """ Met à jour statut + gain (dividende PMU réel) pour tous les paris ML EN_ATTENTE. Sources: pmu_partants (ordre_arrivee) + pmu_rapports (dividende_euro). """ cursor = conn.cursor() cursor.execute( """ SELECT id, race_label, type_pari, numero1, chevaux, mise, source_reco, commentaire FROM paris WHERE date_course = ? AND statut = 'EN_ATTENTE' AND source_reco LIKE 'xgboost%' """, (date,), ) paris = [dict(r) for r in cursor.fetchall()] if not paris: log.info(f"[UPDATE] {date} → aucun pari ML EN_ATTENTE") return 0 maj = 0 for pari in paris: pari_id = pari["id"] race_label = pari["race_label"] or "" type_pari = pari["type_pari"] numero1 = pari["numero1"] mise = pari["mise"] # Extraire num_reunion / num_course depuis le race_label "R{r}C{c}" try: parts = race_label.replace("R", "").split("C") num_reunion = int(parts[0]) num_course = int(parts[1]) except Exception: log.warning(f"[UPDATE] race_label invalide : {race_label}") continue if type_pari == "simple_gagnant": cursor.execute( """ SELECT ordre_arrivee FROM pmu_partants WHERE date_programme = ? AND num_reunion = ? AND num_course = ? AND num_pmu = ? """, (date, num_reunion, num_course, numero1), ) row = cursor.fetchone() if not row or row["ordre_arrivee"] is None or row["ordre_arrivee"] == 0: continue gagne = row["ordre_arrivee"] == 1 gain = 0.0 if gagne: cursor.execute( """ SELECT dividende_euro FROM pmu_rapports WHERE date_programme = ? AND num_reunion = ? AND num_course = ? AND type_pari = 'SIMPLE_GAGNANT' AND CAST(combinaison AS INTEGER) = ? AND libelle NOT LIKE '%NP%' """, (date, num_reunion, num_course, numero1), ) div = cursor.fetchone() gain = div["dividende_euro"] if div and div["dividende_euro"] else 0.0 cursor.execute( "UPDATE paris SET statut=?, gain=? WHERE id=?", ("GAGNE" if gagne else "PERDU", gain, pari_id), ) maj += 1 elif type_pari == "simple_place": cursor.execute( """ SELECT ordre_arrivee FROM pmu_partants WHERE date_programme = ? AND num_reunion = ? AND num_course = ? AND num_pmu = ? """, (date, num_reunion, num_course, numero1), ) row = cursor.fetchone() if not row or not row["ordre_arrivee"]: continue gagne = 1 <= row["ordre_arrivee"] <= 3 gain = 0.0 if gagne: cursor.execute( """ SELECT dividende_euro FROM pmu_rapports WHERE date_programme = ? AND num_reunion = ? AND num_course = ? AND type_pari = 'SIMPLE_PLACE' AND CAST(combinaison AS INTEGER) = ? AND libelle NOT LIKE '%NP%' """, (date, num_reunion, num_course, numero1), ) div = cursor.fetchone() gain = div["dividende_euro"] if div and div["dividende_euro"] else 0.0 cursor.execute( "UPDATE paris SET statut=?, gain=? WHERE id=?", ("GAGNE" if gagne else "PERDU", gain, pari_id), ) maj += 1 elif type_pari == "deux_sur_quatre": # Récupère les 4 numéros depuis commentaire "top4 ML: n1/n2/n3/n4" try: nums_str = ( pari["commentaire"].split(": ")[1] if pari.get("commentaire") else "" ) nums_top4 = [int(n) for n in nums_str.split("/") if n.strip().isdigit()] except Exception: nums_top4 = [] if len(nums_top4) < 4: # Fallback : reconstituer top4 depuis ml_predictions_cache cursor.execute( """ SELECT horse_number FROM ml_predictions_cache WHERE date = ? AND num_reunion = ? AND num_course = ? ORDER BY ml_score DESC LIMIT 4 """, (date, num_reunion, num_course), ) nums_top4 = [r["horse_number"] for r in cursor.fetchall()] if len(nums_top4) < 2: continue cursor.execute( """ SELECT combinaison, dividende_euro FROM pmu_rapports WHERE date_programme = ? AND num_reunion = ? AND num_course = ? AND type_pari = 'DEUX_SUR_QUATRE' AND libelle NOT LIKE '%NP%' """, (date, num_reunion, num_course), ) rapports = [dict(r) for r in cursor.fetchall()] gain_total = 0.0 for rap in rapports: try: n1, n2 = [int(x) for x in rap["combinaison"].split("-")] except Exception: continue if n1 in nums_top4 and n2 in nums_top4: gain_total += rap["dividende_euro"] gagne = gain_total > 0 cursor.execute( "UPDATE paris SET statut=?, gain=? WHERE id=?", ("GAGNE" if gagne else "PERDU", round(gain_total, 2), pari_id), ) maj += 1 conn.commit() log.info(f"[UPDATE] {date} → {maj}/{len(paris)} paris ML mis à jour") return maj # ───────────────────────────────────────────────────────── # STATS PAR STRATÉGIE # ───────────────────────────────────────────────────────── def get_feedback_stats(conn, date_debut=None, date_fin=None): """Stats performances ML par stratégie (source_reco).""" cursor = conn.cursor() cursor.execute( """ SELECT source_reco, COUNT(*) as n_paris, SUM(CASE WHEN statut='GAGNE' THEN 1 ELSE 0 END) as n_gagne, SUM(CASE WHEN statut='PERDU' THEN 1 ELSE 0 END) as n_perdu, SUM(CASE WHEN statut='EN_ATTENTE' THEN 1 ELSE 0 END) as n_attente, ROUND(100.0 * SUM(CASE WHEN statut='GAGNE' THEN 1 ELSE 0 END) / NULLIF(SUM(CASE WHEN statut IN ('GAGNE','PERDU') THEN 1 ELSE 0 END), 0), 1) as win_rate_pct, ROUND(SUM(gain), 2) as gain_total, ROUND(SUM(mise), 2) as mise_totale, ROUND(SUM(gain) - SUM(mise), 2) as roi_net FROM paris WHERE source_reco LIKE 'xgboost%' AND (:debut IS NULL OR date_course >= :debut) AND (:fin IS NULL OR date_course <= :fin) GROUP BY source_reco ORDER BY source_reco """, {"debut": date_debut, "fin": date_fin}, ) return [dict(r) for r in cursor.fetchall()] # ───────────────────────────────────────────────────────── # PIPELINE COMPLET # ───────────────────────────────────────────────────────── def run(date): """Enregistre les paris ML du jour + met à jour les résultats de J-1.""" conn = get_db() log.info(f"=== ml_feedback_saas.run({date}) ===") # 1. Enregistre les paris ML pour la date (depuis le cache du jour) sg = save_ml_paris_sg(conn, date) vb = save_ml_paris_value(conn, date) sp = save_ml_paris_sp(conn, date) s4 = save_ml_paris_2sur4(conn, date) log.info(f"[SAVE] {date} → total insérés : SG={sg} VALUE={vb} SP={sp} 2S4={s4}") # 2. Met à jour les résultats de J-1 (résultats PMU disponibles) yesterday = (datetime.strptime(date, "%Y-%m-%d") - timedelta(days=1)).strftime( "%Y-%m-%d" ) maj = update_ml_paris_results(conn, yesterday) log.info(f"[UPDATE] {yesterday} → {maj} paris mis à jour") conn.close() return {"inseres": {"sg": sg, "value": vb, "sp": sp, "2sur4": s4}, "maj": maj} def backfill(date): """Backfill : insère ET met à jour les résultats pour une date passée.""" conn = get_db() log.info(f"=== ml_feedback_saas.backfill({date}) ===") sg = save_ml_paris_sg(conn, date) vb = save_ml_paris_value(conn, date) sp = save_ml_paris_sp(conn, date) s4 = save_ml_paris_2sur4(conn, date) log.info(f"[SAVE] {date} → SG={sg} VALUE={vb} SP={sp} 2S4={s4}") maj = update_ml_paris_results(conn, date) log.info(f"[UPDATE] {date} → {maj} paris mis à jour") conn.close() return sg + vb + sp + s4, maj # ───────────────────────────────────────────────────────── # MAIN # ───────────────────────────────────────────────────────── if __name__ == "__main__": if "--backfill" in sys.argv: idx = sys.argv.index("--backfill") date = sys.argv[idx + 1] if idx + 1 < len(sys.argv) else None if not date: print("Usage: python3 ml_feedback_saas.py --backfill YYYY-MM-DD") sys.exit(1) inseres, maj = backfill(date) print(f"Backfill {date} : {inseres} paris insérés, {maj} mis à jour") elif "--date" in sys.argv: idx = sys.argv.index("--date") date = sys.argv[idx + 1] if idx + 1 < len(sys.argv) else None if not date: print("Usage: python3 ml_feedback_saas.py --date YYYY-MM-DD") sys.exit(1) result = run(date) total = sum(result["inseres"].values()) print(f"Run {date} : {total} paris insérés, {result['maj']} mis à jour") else: result = run(datetime.now().strftime("%Y-%m-%d")) total = sum(result["inseres"].values()) print(f"Run today : {total} paris insérés, {result['maj']} mis à jour")