From c072f927944cbf52d1d1ba4bfa027d226a80ffc3 Mon Sep 17 00:00:00 2001 From: CTO H3R7Tech Date: Sat, 23 May 2026 22:54:29 +0200 Subject: [PATCH] Fix #1: Ajout job run_ml_cache dans scheduler pour alimenter ml_predictions_cache - run_ml_cache() lit les partants, genere predictions via predict_v2, enrichit avec metadonnees course, calcule risque, ecrit dans cache - Planifie 4x/jour: 09:30, 11:35, 13:30, 17:35 - Installe dependances: optuna, shap, lightgbm Co-Authored-By: Paperclip --- turf_scheduler.py | 232 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 232 insertions(+) diff --git a/turf_scheduler.py b/turf_scheduler.py index fce95fc..ac99df7 100755 --- a/turf_scheduler.py +++ b/turf_scheduler.py @@ -107,6 +107,34 @@ def run_analytics(): traceback.print_exc() +def run_sync_turf_db(): + """Synchronise turf.db vers turf_saas.db""" + logger.info("🔄 [SCHEDULER] Sync turf.db -> turf_saas.db...") + try: + import subprocess + + result = subprocess.run( + [ + "python3", + "/home/h3r7/turf_saas/sync_turf_db.py", + "--date", + datetime.now().strftime("%Y-%m-%d"), + ], + capture_output=True, + text=True, + timeout=300, + ) + if result.returncode == 0: + logger.info("✅ [SCHEDULER] Sync turf.db terminé") + else: + logger.error(f"❌ [SCHEDULER] Sync turf.db échoué: {result.stderr}") + except Exception as e: + logger.error(f"❌ [SCHEDULER] Erreur sync turf.db: {e}") + import traceback + + traceback.print_exc() + + def get_todays_race_time(): """Récupère l'heure de la course principale du jour depuis la DB Returns: timestamp en ms ou None @@ -315,6 +343,16 @@ def main(): schedule.every().day.at("20:00").do(run_results).tag("results", "daily_fallback") schedule.every().day.at("19:00").do(run_scraper).tag("scraper", "late_evening") + # Sync turf.db -> turf_saas.db (2x/jour: post-scraping + post-cotes) + schedule.every().day.at("11:00").do(run_sync_turf_db).tag("sync", "post_scraping") + schedule.every().day.at("17:00").do(run_sync_turf_db).tag("sync", "post_cotes") + + # ML Cache: populate ml_predictions_cache après chaque sync + schedule.every().day.at("11:35").do(run_ml_cache).tag("ml_cache", "post_sync_am") + schedule.every().day.at("17:35").do(run_ml_cache).tag("ml_cache", "post_sync_pm") + schedule.every().day.at("09:30").do(run_ml_cache).tag("ml_cache", "morning") + schedule.every().day.at("13:30").do(run_ml_cache).tag("ml_cache", "pre_race") + schedule.every().sunday.at("02:00").do(run_ml).tag("ml", "weekly") schedule.every().wednesday.at("02:00").do(run_ml).tag("ml", "midweek") @@ -335,6 +373,200 @@ def main(): time.sleep(30) +def run_ml_cache(): + """Populate ml_predictions_cache with ensemble (predict_v2) predictions""" + logger.info("🤖 [SCHEDULER] Mise à jour cache prédictions ML (ensemble)...") + try: + os.chdir("/home/h3r7/turf_saas") + import predict_v2 + + model = predict_v2.load_ensemble() + if model is None: + logger.warning("⚠️ [SCHEDULER] Ensemble model not available, skipping") + return + + conn = sqlite3.connect(DB_PATH) + conn.row_factory = sqlite3.Row + today = datetime.now().strftime("%Y-%m-%d") + + rows = conn.execute(""" + SELECT p.*, c.distance, c.discipline, c.specialite, + c.nb_declares_partants, c.montant_prix, c.penetrometre_intitule, + c.libelle as course_libelle, c.libelle_court as hippodrome, + c.heure_depart_str, c.parcours + FROM pmu_partants p + LEFT JOIN pmu_courses c ON p.date_programme = c.date_programme + AND p.num_reunion = c.num_reunion AND p.num_course = c.num_course + WHERE p.date_programme = ? + ORDER BY p.num_reunion, p.num_course, p.num_pmu + """, (today,)).fetchall() + + if not rows: + logger.info("ℹ️ [SCHEDULER] No partants today, skipping ML cache") + conn.close() + return + + partants = [dict(r) for r in rows] + + course_lookup = {} + for p in partants: + key = (p["num_reunion"], p["num_course"]) + if key not in course_lookup: + course_lookup[key] = { + "libelle": p.get("course_libelle", ""), + "libelle_court": p.get("hippodrome", ""), + "discipline": p.get("discipline", ""), + "distance": p.get("distance", 0), + "heure_depart_str": p.get("heure_depart_str", ""), + } + + odds_by_horse = {} + for p in partants: + odds_by_horse[(p["num_reunion"], p["num_course"], p["num_pmu"])] = p.get("cote_direct", 0) + + preds = predict_v2.predict_top3(partants, model=model) + if not preds: + logger.warning("⚠️ [SCHEDULER] No predictions generated") + conn.close() + return + + enriched = [] + for p in preds: + key = (p.get("num_reunion"), p.get("num_course")) + ci = course_lookup.get(key, {}) + odds_key = (p.get("num_reunion"), p.get("num_course"), p.get("num_pmu")) + enriched.append({ + "num_reunion": p.get("num_reunion"), + "num_course": p.get("num_course"), + "horse_name": p.get("horse_name"), + "horse_number": p.get("num_pmu"), + "odds": odds_by_horse.get(odds_key, 0), + "prob_top1": p.get("prob_top1"), + "prob_top3": p.get("prob_top3"), + "ml_score": p.get("ml_score"), + "recommendation": p.get("recommendation"), + "is_value_bet": p.get("is_value_bet", 0), + "is_outlier": 0, + "race_label": f"R{p.get('num_reunion', 0)}C{p.get('num_course', 0)}", + "race_name": ci.get("libelle", ""), + "hippodrome": ci.get("libelle_court", ""), + "discipline": ci.get("discipline", ""), + "distance": ci.get("distance", 0), + "heure": ci.get("heure_depart_str", ""), + }) + + # Calculate risques per race (same logic as dashboard_api.calculate_risque) + from collections import defaultdict + race_horses = defaultdict(list) + for p in enriched: + rkey = (p.get("num_reunion"), p.get("num_course")) + race_horses[rkey].append({ + "odds": p.get("odds", 999), + "ml_score": p.get("ml_score", 0), + "prob_top1": p.get("prob_top1", 0), + "prob_top3": p.get("prob_top3", 0), + }) + + race_risque = {} + for rkey, partants_list in race_horses.items(): + label, score = _calc_risque(partants_list) + race_risque[rkey] = (label or "neutral", score or 50) + + # Ensure table exists with all columns + conn.execute(""" + CREATE TABLE IF NOT EXISTS ml_predictions_cache ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + date TEXT NOT NULL, num_reunion INTEGER, num_course INTEGER, + horse_name TEXT, horse_number INTEGER, odds REAL, + prob_top1 REAL, prob_top3 REAL, ml_score REAL, + recommendation TEXT, is_value_bet INTEGER DEFAULT 0, + is_outlier INTEGER DEFAULT 0, race_label TEXT, race_name TEXT, + hippodrome TEXT, discipline TEXT, distance REAL, heure TEXT, + model_version TEXT DEFAULT 'xgboost_v1', + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + risque_label TEXT DEFAULT 'neutral', risque_score INTEGER DEFAULT 50, + UNIQUE(date, num_reunion, num_course, horse_name) + ) + """) + conn.execute("CREATE INDEX IF NOT EXISTS idx_ml_cache_date ON ml_predictions_cache(date)") + + try: + conn.execute("ALTER TABLE ml_predictions_cache ADD COLUMN risque_label TEXT DEFAULT 'neutral'") + except Exception: + pass + try: + conn.execute("ALTER TABLE ml_predictions_cache ADD COLUMN risque_score INTEGER DEFAULT 50") + except Exception: + pass + + conn.execute("DELETE FROM ml_predictions_cache WHERE date = ?", (today,)) + + for p in enriched: + rkey = (p.get("num_reunion"), p.get("num_course")) + rl, rs = race_risque.get(rkey, ("neutral", 50)) + conn.execute(""" + INSERT INTO ml_predictions_cache + (date, num_reunion, num_course, horse_name, horse_number, odds, + prob_top1, prob_top3, ml_score, recommendation, is_value_bet, is_outlier, + race_label, race_name, hippodrome, discipline, distance, heure, + risque_label, risque_score, model_version) + VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) + """, ( + today, p.get("num_reunion"), p.get("num_course"), + p.get("horse_name"), p.get("horse_number"), p.get("odds"), + p.get("prob_top1"), p.get("prob_top3"), p.get("ml_score"), + p.get("recommendation"), p.get("is_value_bet", 0), p.get("is_outlier", 0), + p.get("race_label"), p.get("race_name"), p.get("hippodrome"), + p.get("discipline"), p.get("distance"), p.get("heure"), + rl, rs, "ensemble_v1", + )) + + conn.commit() + conn.close() + logger.info(f"✅ [SCHEDULER] ML cache mis à jour: {len(enriched)} prédictions pour {today}") + + except Exception as e: + logger.error(f"❌ [SCHEDULER] Erreur ML cache: {e}") + import traceback + traceback.print_exc() + + +def _calc_risque(partants_list): + """Same logic as dashboard_api.calculate_risque — kept local to avoid import side effects""" + if not partants_list: + return None, None + sorted_p = sorted( + partants_list, + key=lambda x: x.get("ml_score") or x.get("prob_top1") or 0, + reverse=True, + ) + top1_score = sorted_p[0].get("ml_score") or sorted_p[0].get("prob_top1") or 0 + top2_score = ( + sorted_p[1].get("ml_score") or sorted_p[1].get("prob_top1") or 0 + if len(sorted_p) > 1 else 0 + ) + gap_1_2 = top1_score - top2_score + nb_dangerous = sum(1 for p in sorted_p if (p.get("ml_score") or 0) > 40) + odds_fav = sorted(partants_list, key=lambda x: x.get("odds") or 999) + fav_odds = odds_fav[0].get("odds") or 999 if odds_fav else 999 + fav_ml = ( + odds_fav[0].get("ml_score") or odds_fav[0].get("prob_top1") or 0 + if odds_fav else 0 + ) + fav_surprise = fav_odds < 5 and fav_ml < 25 + if top1_score >= 65 and gap_1_2 >= 20: + score = min(100, int(50 + gap_1_2 * 1.5)) + return "safe", score + if fav_surprise: + return "trap", max(10, int(35 - (25 - fav_ml))) + if nb_dangerous >= 4 and top1_score < 70: + return "trap", max(10, int(40 - nb_dangerous * 2)) + if gap_1_2 < 8 and top2_score > 45: + return "trap", max(15, int(30 + gap_1_2)) + score = min(64, max(35, int(35 + gap_1_2 * 1.2))) + return "neutral", score + + def run_metrics_alerts(): """Verifie les metriques du jour et envoie une alerte email si ROI > 1.0€""" logger.info("📧 [SCHEDULER] Vérification alertes métriques...")