Files
turf_saas/turf_scheduler.py
CTO H3R7Tech c072f92794 Fix #1: Ajout job run_ml_cache dans scheduler pour alimenter ml_predictions_cache
- run_ml_cache() lit les partants, genere predictions via predict_v2,
  enrichit avec metadonnees course, calcule risque, ecrit dans cache
- Planifie 4x/jour: 09:30, 11:35, 13:30, 17:35
- Installe dependances: optuna, shap, lightgbm

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-05-23 22:54:29 +02:00

604 lines
22 KiB
Python
Executable File
Raw Permalink Blame History

This file contains invisible Unicode characters
This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Turf Scheduler - Scraping automatique sans dépendance OpenClaw
"""
import sys
import os
import sqlite3
import schedule
import time
import logging
from datetime import datetime
sys.path.insert(0, "/home/h3r7/turf_saas")
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s - %(levelname)s - %(message)s",
handlers=[
logging.FileHandler("/home/h3r7/turf_saas/scheduler.log"),
logging.StreamHandler(),
],
)
logger = logging.getLogger(__name__)
DB_PATH = "/home/h3r7/turf_saas/turf_saas.db"
def run_scraper():
"""Lance le scraper principal"""
logger.info("🕐 [SCHEDULER] Exécution scraper...")
try:
os.chdir("/home/h3r7/turf_saas")
import multi_scraper_v5
result = multi_scraper_v5.main()
logger.info(f"✅ [SCHEDULER] Scraper terminé: {result}")
except Exception as e:
logger.error(f"❌ [SCHEDULER] Erreur scraper: {e}")
import traceback
traceback.print_exc()
def run_scoring():
"""Lance le scoring (calcul des scores et recommandations)"""
logger.info("🧠 [SCHEDULER] Exécution scoring...")
try:
os.chdir("/home/h3r7/turf_saas")
import scoring_v2 as scoring
scoring.main()
logger.info("✅ [SCHEDULER] Scoring terminé")
except Exception as e:
logger.error(f"❌ [SCHEDULER] Erreur scoring: {e}")
import traceback
traceback.print_exc()
def run_results():
"""Récupère les résultats"""
logger.info("🕐 [SCHEDULER] Récupération résultats...")
try:
os.chdir("/home/h3r7/turf_saas")
import pmu_results
from datetime import datetime
today = datetime.now().strftime("%d%m%Y")
pmu_results.run(today)
logger.info("✅ [SCHEDULER] Résultats récupérés")
except Exception as e:
logger.error(f"❌ [SCHEDULER] Erreur résultats: {e}")
import traceback
traceback.print_exc()
def run_ml():
"""Entraîne les modèles ML"""
logger.info("🕐 [SCHEDULER] Entraînement ML...")
try:
os.chdir("/home/h3r7/turf_saas")
import train_xgboost
train_xgboost.main()
logger.info("✅ [SCHEDULER] ML terminé")
except Exception as e:
logger.error(f"❌ [SCHEDULER] Erreur ML: {e}")
def run_analytics():
"""Met à jour les analytics"""
logger.info("🕐 [SCHEDULER] Analytics...")
try:
os.chdir("/home/h3r7/turf_saas")
import populate_analytics
populate_analytics.populate_bet_results()
populate_analytics.populate_daily_stats()
populate_analytics.populate_stats_by_type()
logger.info("✅ [SCHEDULER] Analytics mis à jour")
except Exception as e:
logger.error(f"❌ [SCHEDULER] Erreur analytics: {e}")
import traceback
traceback.print_exc()
def run_sync_turf_db():
"""Synchronise turf.db vers turf_saas.db"""
logger.info("🔄 [SCHEDULER] Sync turf.db -> turf_saas.db...")
try:
import subprocess
result = subprocess.run(
[
"python3",
"/home/h3r7/turf_saas/sync_turf_db.py",
"--date",
datetime.now().strftime("%Y-%m-%d"),
],
capture_output=True,
text=True,
timeout=300,
)
if result.returncode == 0:
logger.info("✅ [SCHEDULER] Sync turf.db terminé")
else:
logger.error(f"❌ [SCHEDULER] Sync turf.db échoué: {result.stderr}")
except Exception as e:
logger.error(f"❌ [SCHEDULER] Erreur sync turf.db: {e}")
import traceback
traceback.print_exc()
def get_todays_race_time():
"""Récupère l'heure de la course principale du jour depuis la DB
Returns: timestamp en ms ou None
"""
try:
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
c = conn.cursor()
today = datetime.now().strftime("%Y-%m-%d")
# Essayer d'abord dans pmu_courses (timestamp ms)
c.execute(
"""
SELECT heure_depart as race_time
FROM pmu_courses
WHERE date_programme = ?
AND heure_depart IS NOT NULL
ORDER BY heure_depart ASC
LIMIT 1
""",
(today,),
)
row = c.fetchone()
if row and row["race_time"]:
conn.close()
return row["race_time"]
# Fallback dans pmu_rapports
c.execute(
"""
SELECT DISTINCT course_time as race_time
FROM pmu_rapports
WHERE date = ?
LIMIT 1
""",
(today,),
)
row = c.fetchone()
if row and row["race_time"]:
conn.close()
return row["race_time"]
conn.close()
return None
except Exception as e:
logger.warning(f"⚠️ Impossible de récupérer l'heure de course: {e}")
return None
def schedule_dynamic_scoring():
"""Planifie le scoring 15min avant la course"""
race_time = get_todays_race_time()
if race_time:
try:
# Convertir timestamp ms en datetime
dt = datetime.fromtimestamp(race_time / 1000)
race_hour = dt.hour
race_min = dt.minute
logger.info(
f"📅 [SCHEDULER] Course détectée à {race_hour:02d}:{race_min:02d}"
)
# Scoring 15min avant la course
pre_min = race_min - 15
pre_hour = race_hour
if pre_min < 0:
pre_min += 60
pre_hour -= 1
scoring_time = f"{pre_hour:02d}:{pre_min:02d}"
schedule.every().day.at(scoring_time).do(run_scoring).tag(
"scoring", "dynamic"
)
logger.info(
f"📅 [SCHEDULER] Scoring dynamique planifié à {scoring_time} (15min avant la course)"
)
except Exception as e:
logger.warning(f"⚠️ Impossible de planifier le scoring dynamique: {e}")
else:
logger.info(" [SCHEDULER] Pas de course aujourd'hui, pas de scoring dynamique")
def run_telegram_alerts():
"""Envoie les alertes Telegram pré-course aux utilisateurs Premium/Pro"""
logger.info("📨 [SCHEDULER] Envoi alertes Telegram pré-course...")
try:
os.chdir("/home/h3r7/turf_saas")
import telegram_alerts
stats = telegram_alerts.send_pre_race_alerts(minutes_before=30)
logger.info(
"✅ [SCHEDULER] Alertes Telegram: %d envoyées, %d ignorées, %d erreurs",
stats.get("sent", 0),
stats.get("skipped", 0),
stats.get("errors", 0),
)
except Exception as e:
logger.error(f"❌ [SCHEDULER] Erreur alertes Telegram: {e}")
import traceback
traceback.print_exc()
def schedule_dynamic_telegram_alerts():
"""Planifie les alertes Telegram 30min avant la course (même pattern que schedule_dynamic_scoring)"""
race_time = get_todays_race_time()
if race_time:
try:
# Convertir timestamp ms en datetime
dt = datetime.fromtimestamp(race_time / 1000)
race_hour = dt.hour
race_min = dt.minute
logger.info(
f"📅 [SCHEDULER] Alertes Telegram — course à {race_hour:02d}:{race_min:02d}"
)
# Alertes 30min avant la course
pre_min = race_min - 30
pre_hour = race_hour
if pre_min < 0:
pre_min += 60
pre_hour -= 1
alert_time = f"{pre_hour:02d}:{pre_min:02d}"
schedule.every().day.at(alert_time).do(run_telegram_alerts).tag(
"telegram", "dynamic"
)
logger.info(
f"📅 [SCHEDULER] Alertes Telegram planifiées à {alert_time} (30min avant la course)"
)
except Exception as e:
logger.warning(f"⚠️ Impossible de planifier les alertes Telegram: {e}")
else:
logger.info(
" [SCHEDULER] Pas de course aujourd'hui, pas d'alertes Telegram dynamiques"
)
def schedule_dynamic_results():
"""Planifie le scraping des résultats à H+1 (1h après la course)"""
race_time = get_todays_race_time()
if race_time:
try:
dt = datetime.fromtimestamp(race_time / 1000)
race_hour = dt.hour
race_min = dt.minute
result_hour = (race_hour + 1) % 24
result_time = f"{result_hour:02d}:{race_min:02d}"
schedule.every().day.at(result_time).do(run_results).tag(
"results", "dynamic"
)
logger.info(
f"📅 [SCHEDULER] Résultats planifiés à {result_time} (H+1 de {race_hour:02d}:{race_min:02d})"
)
except Exception as e:
logger.warning(f"⚠️ Impossible de planifier les résultats: {e}")
schedule.every().day.at("15:00").do(run_results).tag("results", "default")
else:
logger.info(" [SCHEDULER] Aucune course aujourd'hui, pas de scrapingResults")
def main():
logger.info("=" * 60)
logger.info("🚀 TURF SCHEDULER INDÉPENDANT DÉMARRÉ")
logger.info("=" * 60)
# Jobs de scraping fixes
schedule.every().day.at("08:00").do(run_scraper).tag("scraper", "early_morning")
schedule.every().day.at("09:00").do(run_scraper).tag("scraper", "morning")
schedule.every().day.at("10:00").do(run_scraper).tag("scraper", "late_morning")
schedule.every().day.at("11:00").do(run_scraper).tag("scraper", "mid_morning")
schedule.every().day.at("12:00").do(run_scraper).tag("scraper", "noon")
schedule.every().day.at("13:00").do(run_scraper).tag("scraper", "early_afternoon")
schedule.every().day.at("13:30").do(run_scraper).tag("scraper", "afternoon")
schedule.every().day.at("13:45").do(run_scraper).tag("scraper", "pre_race")
schedule.every().day.at("14:00").do(run_scraper).tag("scraper", "post_race")
# Scoring fixes - suit l'évolution des cotes
schedule.every().day.at("09:30").do(run_scoring).tag("scoring", "morning")
schedule.every().day.at("11:30").do(run_scoring).tag("scoring", "late_morning")
schedule.every().day.at("12:30").do(run_scoring).tag("scoring", "noon")
schedule.every().day.at("13:30").do(run_scoring).tag("scoring", "pre_race")
# Scoring dynamique (15min avant course)
schedule_dynamic_scoring()
# Alertes Telegram dynamiques (30min avant course)
schedule_dynamic_telegram_alerts()
# Résultats dynamiques (H+1)
schedule_dynamic_results()
schedule.every().day.at("18:00").do(run_scraper).tag("scraper", "evening")
# Resultats automatiques (fixe 20h00 - fallback)
schedule.every().day.at("20:00").do(run_results).tag("results", "daily_fallback")
schedule.every().day.at("19:00").do(run_scraper).tag("scraper", "late_evening")
# Sync turf.db -> turf_saas.db (2x/jour: post-scraping + post-cotes)
schedule.every().day.at("11:00").do(run_sync_turf_db).tag("sync", "post_scraping")
schedule.every().day.at("17:00").do(run_sync_turf_db).tag("sync", "post_cotes")
# ML Cache: populate ml_predictions_cache après chaque sync
schedule.every().day.at("11:35").do(run_ml_cache).tag("ml_cache", "post_sync_am")
schedule.every().day.at("17:35").do(run_ml_cache).tag("ml_cache", "post_sync_pm")
schedule.every().day.at("09:30").do(run_ml_cache).tag("ml_cache", "morning")
schedule.every().day.at("13:30").do(run_ml_cache).tag("ml_cache", "pre_race")
schedule.every().sunday.at("02:00").do(run_ml).tag("ml", "weekly")
schedule.every().wednesday.at("02:00").do(run_ml).tag("ml", "midweek")
schedule.every().day.at("15:00").do(run_analytics).tag("analytics", "daily")
# Alertes email automatiques : verif ROI exceptionnel tous les jours a 21h30
schedule.every().day.at("21:30").do(run_metrics_alerts).tag("alerts", "email_roi")
schedule.every().hour.do(lambda: logger.info("💓 Scheduler alive"))
logger.info("📅 Jobs planifiés:")
for job in schedule.jobs:
logger.info(f" - {job}")
logger.info("=" * 60)
while True:
schedule.run_pending()
time.sleep(30)
def run_ml_cache():
"""Populate ml_predictions_cache with ensemble (predict_v2) predictions"""
logger.info("🤖 [SCHEDULER] Mise à jour cache prédictions ML (ensemble)...")
try:
os.chdir("/home/h3r7/turf_saas")
import predict_v2
model = predict_v2.load_ensemble()
if model is None:
logger.warning("⚠️ [SCHEDULER] Ensemble model not available, skipping")
return
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
today = datetime.now().strftime("%Y-%m-%d")
rows = conn.execute("""
SELECT p.*, c.distance, c.discipline, c.specialite,
c.nb_declares_partants, c.montant_prix, c.penetrometre_intitule,
c.libelle as course_libelle, c.libelle_court as hippodrome,
c.heure_depart_str, c.parcours
FROM pmu_partants p
LEFT JOIN pmu_courses c ON p.date_programme = c.date_programme
AND p.num_reunion = c.num_reunion AND p.num_course = c.num_course
WHERE p.date_programme = ?
ORDER BY p.num_reunion, p.num_course, p.num_pmu
""", (today,)).fetchall()
if not rows:
logger.info(" [SCHEDULER] No partants today, skipping ML cache")
conn.close()
return
partants = [dict(r) for r in rows]
course_lookup = {}
for p in partants:
key = (p["num_reunion"], p["num_course"])
if key not in course_lookup:
course_lookup[key] = {
"libelle": p.get("course_libelle", ""),
"libelle_court": p.get("hippodrome", ""),
"discipline": p.get("discipline", ""),
"distance": p.get("distance", 0),
"heure_depart_str": p.get("heure_depart_str", ""),
}
odds_by_horse = {}
for p in partants:
odds_by_horse[(p["num_reunion"], p["num_course"], p["num_pmu"])] = p.get("cote_direct", 0)
preds = predict_v2.predict_top3(partants, model=model)
if not preds:
logger.warning("⚠️ [SCHEDULER] No predictions generated")
conn.close()
return
enriched = []
for p in preds:
key = (p.get("num_reunion"), p.get("num_course"))
ci = course_lookup.get(key, {})
odds_key = (p.get("num_reunion"), p.get("num_course"), p.get("num_pmu"))
enriched.append({
"num_reunion": p.get("num_reunion"),
"num_course": p.get("num_course"),
"horse_name": p.get("horse_name"),
"horse_number": p.get("num_pmu"),
"odds": odds_by_horse.get(odds_key, 0),
"prob_top1": p.get("prob_top1"),
"prob_top3": p.get("prob_top3"),
"ml_score": p.get("ml_score"),
"recommendation": p.get("recommendation"),
"is_value_bet": p.get("is_value_bet", 0),
"is_outlier": 0,
"race_label": f"R{p.get('num_reunion', 0)}C{p.get('num_course', 0)}",
"race_name": ci.get("libelle", ""),
"hippodrome": ci.get("libelle_court", ""),
"discipline": ci.get("discipline", ""),
"distance": ci.get("distance", 0),
"heure": ci.get("heure_depart_str", ""),
})
# Calculate risques per race (same logic as dashboard_api.calculate_risque)
from collections import defaultdict
race_horses = defaultdict(list)
for p in enriched:
rkey = (p.get("num_reunion"), p.get("num_course"))
race_horses[rkey].append({
"odds": p.get("odds", 999),
"ml_score": p.get("ml_score", 0),
"prob_top1": p.get("prob_top1", 0),
"prob_top3": p.get("prob_top3", 0),
})
race_risque = {}
for rkey, partants_list in race_horses.items():
label, score = _calc_risque(partants_list)
race_risque[rkey] = (label or "neutral", score or 50)
# Ensure table exists with all columns
conn.execute("""
CREATE TABLE IF NOT EXISTS ml_predictions_cache (
id INTEGER PRIMARY KEY AUTOINCREMENT,
date TEXT NOT NULL, num_reunion INTEGER, num_course INTEGER,
horse_name TEXT, horse_number INTEGER, odds REAL,
prob_top1 REAL, prob_top3 REAL, ml_score REAL,
recommendation TEXT, is_value_bet INTEGER DEFAULT 0,
is_outlier INTEGER DEFAULT 0, race_label TEXT, race_name TEXT,
hippodrome TEXT, discipline TEXT, distance REAL, heure TEXT,
model_version TEXT DEFAULT 'xgboost_v1',
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
risque_label TEXT DEFAULT 'neutral', risque_score INTEGER DEFAULT 50,
UNIQUE(date, num_reunion, num_course, horse_name)
)
""")
conn.execute("CREATE INDEX IF NOT EXISTS idx_ml_cache_date ON ml_predictions_cache(date)")
try:
conn.execute("ALTER TABLE ml_predictions_cache ADD COLUMN risque_label TEXT DEFAULT 'neutral'")
except Exception:
pass
try:
conn.execute("ALTER TABLE ml_predictions_cache ADD COLUMN risque_score INTEGER DEFAULT 50")
except Exception:
pass
conn.execute("DELETE FROM ml_predictions_cache WHERE date = ?", (today,))
for p in enriched:
rkey = (p.get("num_reunion"), p.get("num_course"))
rl, rs = race_risque.get(rkey, ("neutral", 50))
conn.execute("""
INSERT INTO ml_predictions_cache
(date, num_reunion, num_course, horse_name, horse_number, odds,
prob_top1, prob_top3, ml_score, recommendation, is_value_bet, is_outlier,
race_label, race_name, hippodrome, discipline, distance, heure,
risque_label, risque_score, model_version)
VALUES (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)
""", (
today, p.get("num_reunion"), p.get("num_course"),
p.get("horse_name"), p.get("horse_number"), p.get("odds"),
p.get("prob_top1"), p.get("prob_top3"), p.get("ml_score"),
p.get("recommendation"), p.get("is_value_bet", 0), p.get("is_outlier", 0),
p.get("race_label"), p.get("race_name"), p.get("hippodrome"),
p.get("discipline"), p.get("distance"), p.get("heure"),
rl, rs, "ensemble_v1",
))
conn.commit()
conn.close()
logger.info(f"✅ [SCHEDULER] ML cache mis à jour: {len(enriched)} prédictions pour {today}")
except Exception as e:
logger.error(f"❌ [SCHEDULER] Erreur ML cache: {e}")
import traceback
traceback.print_exc()
def _calc_risque(partants_list):
"""Same logic as dashboard_api.calculate_risque — kept local to avoid import side effects"""
if not partants_list:
return None, None
sorted_p = sorted(
partants_list,
key=lambda x: x.get("ml_score") or x.get("prob_top1") or 0,
reverse=True,
)
top1_score = sorted_p[0].get("ml_score") or sorted_p[0].get("prob_top1") or 0
top2_score = (
sorted_p[1].get("ml_score") or sorted_p[1].get("prob_top1") or 0
if len(sorted_p) > 1 else 0
)
gap_1_2 = top1_score - top2_score
nb_dangerous = sum(1 for p in sorted_p if (p.get("ml_score") or 0) > 40)
odds_fav = sorted(partants_list, key=lambda x: x.get("odds") or 999)
fav_odds = odds_fav[0].get("odds") or 999 if odds_fav else 999
fav_ml = (
odds_fav[0].get("ml_score") or odds_fav[0].get("prob_top1") or 0
if odds_fav else 0
)
fav_surprise = fav_odds < 5 and fav_ml < 25
if top1_score >= 65 and gap_1_2 >= 20:
score = min(100, int(50 + gap_1_2 * 1.5))
return "safe", score
if fav_surprise:
return "trap", max(10, int(35 - (25 - fav_ml)))
if nb_dangerous >= 4 and top1_score < 70:
return "trap", max(10, int(40 - nb_dangerous * 2))
if gap_1_2 < 8 and top2_score > 45:
return "trap", max(15, int(30 + gap_1_2))
score = min(64, max(35, int(35 + gap_1_2 * 1.2)))
return "neutral", score
def run_metrics_alerts():
"""Verifie les metriques du jour et envoie une alerte email si ROI > 1.0€"""
logger.info("📧 [SCHEDULER] Vérification alertes métriques...")
try:
os.chdir("/home/h3r7/turf_saas")
import metrics_alerts
from datetime import datetime, timedelta
date_str = (datetime.now() - timedelta(days=1)).strftime("%Y-%m-%d")
result = metrics_alerts.check_daily_alerts(date_str)
if result:
msg, has_roi = result
if has_roi:
logger.info("💰 [SCHEDULER] ROI exceptionnel détecté — envoi email...")
date_fmt = datetime.strptime(date_str, "%Y-%m-%d").strftime("%d/%m/%Y")
subject = "Alerte Turf — ROI exceptionnel {}".format(date_fmt)
sent = metrics_alerts.send_email_alert(subject, msg)
if sent:
logger.info("✅ [SCHEDULER] Email alerte envoyé")
else:
logger.warning("⚠️ [SCHEDULER] Echec envoi email alerte")
else:
logger.info(" [SCHEDULER] Pas d'alerte ROI aujourd'hui")
else:
logger.info(" [SCHEDULER] Aucune métrique disponible pour alertes")
except Exception as e:
logger.error(f"❌ [SCHEDULER] Erreur alertes métriques: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()