Fix #1: Ajout job run_ml_cache dans scheduler pour alimenter ml_predictions_cache

- run_ml_cache() lit les partants, genere predictions via predict_v2,
  enrichit avec metadonnees course, calcule risque, ecrit dans cache
- Planifie 4x/jour: 09:30, 11:35, 13:30, 17:35
- Installe dependances: optuna, shap, lightgbm

Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
CTO H3R7Tech
2026-05-23 22:54:29 +02:00
parent fac498efec
commit c072f92794

View File

@@ -107,6 +107,34 @@ def run_analytics():
traceback.print_exc() traceback.print_exc()
def run_sync_turf_db():
"""Synchronise turf.db vers turf_saas.db"""
logger.info("🔄 [SCHEDULER] Sync turf.db -> turf_saas.db...")
try:
import subprocess
result = subprocess.run(
[
"python3",
"/home/h3r7/turf_saas/sync_turf_db.py",
"--date",
datetime.now().strftime("%Y-%m-%d"),
],
capture_output=True,
text=True,
timeout=300,
)
if result.returncode == 0:
logger.info("✅ [SCHEDULER] Sync turf.db terminé")
else:
logger.error(f"❌ [SCHEDULER] Sync turf.db échoué: {result.stderr}")
except Exception as e:
logger.error(f"❌ [SCHEDULER] Erreur sync turf.db: {e}")
import traceback
traceback.print_exc()
def get_todays_race_time(): def get_todays_race_time():
"""Récupère l'heure de la course principale du jour depuis la DB """Récupère l'heure de la course principale du jour depuis la DB
Returns: timestamp en ms ou None Returns: timestamp en ms ou None
@@ -315,6 +343,16 @@ def main():
schedule.every().day.at("20:00").do(run_results).tag("results", "daily_fallback") schedule.every().day.at("20:00").do(run_results).tag("results", "daily_fallback")
schedule.every().day.at("19:00").do(run_scraper).tag("scraper", "late_evening") schedule.every().day.at("19:00").do(run_scraper).tag("scraper", "late_evening")
# Sync turf.db -> turf_saas.db (2x/jour: post-scraping + post-cotes)
schedule.every().day.at("11:00").do(run_sync_turf_db).tag("sync", "post_scraping")
schedule.every().day.at("17:00").do(run_sync_turf_db).tag("sync", "post_cotes")
# ML Cache: populate ml_predictions_cache après chaque sync
schedule.every().day.at("11:35").do(run_ml_cache).tag("ml_cache", "post_sync_am")
schedule.every().day.at("17:35").do(run_ml_cache).tag("ml_cache", "post_sync_pm")
schedule.every().day.at("09:30").do(run_ml_cache).tag("ml_cache", "morning")
schedule.every().day.at("13:30").do(run_ml_cache).tag("ml_cache", "pre_race")
schedule.every().sunday.at("02:00").do(run_ml).tag("ml", "weekly") schedule.every().sunday.at("02:00").do(run_ml).tag("ml", "weekly")
schedule.every().wednesday.at("02:00").do(run_ml).tag("ml", "midweek") schedule.every().wednesday.at("02:00").do(run_ml).tag("ml", "midweek")
@@ -335,6 +373,200 @@ def main():
time.sleep(30) time.sleep(30)
def run_ml_cache():
"""Populate ml_predictions_cache with ensemble (predict_v2) predictions"""
logger.info("🤖 [SCHEDULER] Mise à jour cache prédictions ML (ensemble)...")
try:
os.chdir("/home/h3r7/turf_saas")
import predict_v2
model = predict_v2.load_ensemble()
if model is None:
logger.warning("⚠️ [SCHEDULER] Ensemble model not available, skipping")
return
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
today = datetime.now().strftime("%Y-%m-%d")
rows = conn.execute("""
SELECT p.*, c.distance, c.discipline, c.specialite,
c.nb_declares_partants, c.montant_prix, c.penetrometre_intitule,
c.libelle as course_libelle, c.libelle_court as hippodrome,
c.heure_depart_str, c.parcours
FROM pmu_partants p
LEFT JOIN pmu_courses c ON p.date_programme = c.date_programme
AND p.num_reunion = c.num_reunion AND p.num_course = c.num_course
WHERE p.date_programme = ?
ORDER BY p.num_reunion, p.num_course, p.num_pmu
""", (today,)).fetchall()
if not rows:
logger.info(" [SCHEDULER] No partants today, skipping ML cache")
conn.close()
return
partants = [dict(r) for r in rows]
course_lookup = {}
for p in partants:
key = (p["num_reunion"], p["num_course"])
if key not in course_lookup:
course_lookup[key] = {
"libelle": p.get("course_libelle", ""),
"libelle_court": p.get("hippodrome", ""),
"discipline": p.get("discipline", ""),
"distance": p.get("distance", 0),
"heure_depart_str": p.get("heure_depart_str", ""),
}
odds_by_horse = {}
for p in partants:
odds_by_horse[(p["num_reunion"], p["num_course"], p["num_pmu"])] = p.get("cote_direct", 0)
preds = predict_v2.predict_top3(partants, model=model)
if not preds:
logger.warning("⚠️ [SCHEDULER] No predictions generated")
conn.close()
return
enriched = []
for p in preds:
key = (p.get("num_reunion"), p.get("num_course"))
ci = course_lookup.get(key, {})
odds_key = (p.get("num_reunion"), p.get("num_course"), p.get("num_pmu"))
enriched.append({
"num_reunion": p.get("num_reunion"),
"num_course": p.get("num_course"),
"horse_name": p.get("horse_name"),
"horse_number": p.get("num_pmu"),
"odds": odds_by_horse.get(odds_key, 0),
"prob_top1": p.get("prob_top1"),
"prob_top3": p.get("prob_top3"),
"ml_score": p.get("ml_score"),
"recommendation": p.get("recommendation"),
"is_value_bet": p.get("is_value_bet", 0),
"is_outlier": 0,
"race_label": f"R{p.get('num_reunion', 0)}C{p.get('num_course', 0)}",
"race_name": ci.get("libelle", ""),
"hippodrome": ci.get("libelle_court", ""),
"discipline": ci.get("discipline", ""),
"distance": ci.get("distance", 0),
"heure": ci.get("heure_depart_str", ""),
})
# Calculate risques per race (same logic as dashboard_api.calculate_risque)
from collections import defaultdict
race_horses = defaultdict(list)
for p in enriched:
rkey = (p.get("num_reunion"), p.get("num_course"))
race_horses[rkey].append({
"odds": p.get("odds", 999),
"ml_score": p.get("ml_score", 0),
"prob_top1": p.get("prob_top1", 0),
"prob_top3": p.get("prob_top3", 0),
})
race_risque = {}
for rkey, partants_list in race_horses.items():
label, score = _calc_risque(partants_list)
race_risque[rkey] = (label or "neutral", score or 50)
# Ensure table exists with all columns
conn.execute("""
CREATE TABLE IF NOT EXISTS ml_predictions_cache (
id INTEGER PRIMARY KEY AUTOINCREMENT,
date TEXT NOT NULL, num_reunion INTEGER, num_course INTEGER,
horse_name TEXT, horse_number INTEGER, odds REAL,
prob_top1 REAL, prob_top3 REAL, ml_score REAL,
recommendation TEXT, is_value_bet INTEGER DEFAULT 0,
is_outlier INTEGER DEFAULT 0, race_label TEXT, race_name TEXT,
hippodrome TEXT, discipline TEXT, distance REAL, heure TEXT,
model_version TEXT DEFAULT 'xgboost_v1',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
risque_label TEXT DEFAULT 'neutral', risque_score INTEGER DEFAULT 50,
UNIQUE(date, num_reunion, num_course, horse_name)
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_ml_cache_date ON ml_predictions_cache(date)")
try:
conn.execute("ALTER TABLE ml_predictions_cache ADD COLUMN risque_label TEXT DEFAULT 'neutral'")
except Exception:
pass
try:
conn.execute("ALTER TABLE ml_predictions_cache ADD COLUMN risque_score INTEGER DEFAULT 50")
except Exception:
pass
conn.execute("DELETE FROM ml_predictions_cache WHERE date = ?", (today,))
for p in enriched:
rkey = (p.get("num_reunion"), p.get("num_course"))
rl, rs = race_risque.get(rkey, ("neutral", 50))
conn.execute("""
INSERT INTO ml_predictions_cache
(date, num_reunion, num_course, horse_name, horse_number, odds,
prob_top1, prob_top3, ml_score, recommendation, is_value_bet, is_outlier,
race_label, race_name, hippodrome, discipline, distance, heure,
risque_label, risque_score, model_version)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
""", (
today, p.get("num_reunion"), p.get("num_course"),
p.get("horse_name"), p.get("horse_number"), p.get("odds"),
p.get("prob_top1"), p.get("prob_top3"), p.get("ml_score"),
p.get("recommendation"), p.get("is_value_bet", 0), p.get("is_outlier", 0),
p.get("race_label"), p.get("race_name"), p.get("hippodrome"),
p.get("discipline"), p.get("distance"), p.get("heure"),
rl, rs, "ensemble_v1",
))
conn.commit()
conn.close()
logger.info(f"✅ [SCHEDULER] ML cache mis à jour: {len(enriched)} prédictions pour {today}")
except Exception as e:
logger.error(f"❌ [SCHEDULER] Erreur ML cache: {e}")
import traceback
traceback.print_exc()
def _calc_risque(partants_list):
"""Same logic as dashboard_api.calculate_risque — kept local to avoid import side effects"""
if not partants_list:
return None, None
sorted_p = sorted(
partants_list,
key=lambda x: x.get("ml_score") or x.get("prob_top1") or 0,
reverse=True,
)
top1_score = sorted_p[0].get("ml_score") or sorted_p[0].get("prob_top1") or 0
top2_score = (
sorted_p[1].get("ml_score") or sorted_p[1].get("prob_top1") or 0
if len(sorted_p) > 1 else 0
)
gap_1_2 = top1_score - top2_score
nb_dangerous = sum(1 for p in sorted_p if (p.get("ml_score") or 0) > 40)
odds_fav = sorted(partants_list, key=lambda x: x.get("odds") or 999)
fav_odds = odds_fav[0].get("odds") or 999 if odds_fav else 999
fav_ml = (
odds_fav[0].get("ml_score") or odds_fav[0].get("prob_top1") or 0
if odds_fav else 0
)
fav_surprise = fav_odds < 5 and fav_ml < 25
if top1_score >= 65 and gap_1_2 >= 20:
score = min(100, int(50 + gap_1_2 * 1.5))
return "safe", score
if fav_surprise:
return "trap", max(10, int(35 - (25 - fav_ml)))
if nb_dangerous >= 4 and top1_score < 70:
return "trap", max(10, int(40 - nb_dangerous * 2))
if gap_1_2 < 8 and top2_score > 45:
return "trap", max(15, int(30 + gap_1_2))
score = min(64, max(35, int(35 + gap_1_2 * 1.2)))
return "neutral", score
def run_metrics_alerts(): def run_metrics_alerts():
"""Verifie les metriques du jour et envoie une alerte email si ROI > 1.0€""" """Verifie les metriques du jour et envoie une alerte email si ROI > 1.0€"""
logger.info("📧 [SCHEDULER] Vérification alertes métriques...") logger.info("📧 [SCHEDULER] Vérification alertes métriques...")