#!/usr/bin/env python3 """ Export route for API v1. GET /api/v1/export/csv — Export CSV des prédictions ou paris (pro) """ import csv import io from datetime import datetime, timedelta from flask import Blueprint, jsonify, request, Response from api_v1.utils import ( get_db, table_exists, internal_error, bad_request, forbidden, ) from auth import jwt_required_middleware, plan_required export_bp = Blueprint("v1_export", __name__, url_prefix="/api/v1/export") # Maximum rows exportable in one request EXPORT_MAX_ROWS = 5000 @export_bp.route("/csv", methods=["GET"]) @jwt_required_middleware @plan_required("pro") def export_csv(): """ Export CSV --- tags: - Export summary: Export CSV des prédictions ML ou des paris historiques — accès pro uniquement security: - Bearer: [] parameters: - name: type in: query type: string enum: [predictions, bets] default: predictions description: Type de données à exporter - name: start in: query type: string format: date description: Date de début (YYYY-MM-DD) - name: end in: query type: string format: date description: Date de fin (YYYY-MM-DD) - name: date in: query type: string format: date description: Date unique (YYYY-MM-DD), ignoré si start/end fournis responses: 200: description: Fichier CSV content: text/csv: schema: type: string 400: description: Paramètre invalide 401: description: Token invalide 403: description: Plan insuffisant (pro requis) """ export_type = request.args.get("type", "predictions").lower() if export_type not in ("predictions", "bets"): return bad_request( "Paramètre 'type' invalide. Valeurs acceptées: predictions, bets" ) start = request.args.get("start") end = request.args.get("end") date = request.args.get("date", datetime.now().strftime("%Y-%m-%d")) for label, val in [("start", start), ("end", end), ("date", date)]: if val: try: datetime.strptime(val, "%Y-%m-%d") except ValueError: return bad_request( f"Paramètre '{label}' invalide, format attendu: YYYY-MM-DD" ) # Build date range if start and end: date_cond = "date BETWEEN ? AND ?" date_params = [start, end] elif start: date_cond = "date >= ?" date_params = [start] else: date_cond = "date = ?" date_params = [date] conn = get_db() try: output = io.StringIO() if export_type == "predictions": if not table_exists(conn, "ml_predictions_cache"): return bad_request("Table ml_predictions_cache introuvable") rows = conn.execute( f"""SELECT date, race_label, hippodrome, discipline, distance, heure, horse_name, horse_number, odds, prob_top1, prob_top3, ml_score, recommendation, is_value_bet, risque_label FROM ml_predictions_cache WHERE {date_cond} ORDER BY date DESC, ml_score DESC LIMIT {EXPORT_MAX_ROWS}""", date_params, ).fetchall() fieldnames = [ "date", "race_label", "hippodrome", "discipline", "distance", "heure", "horse_name", "horse_number", "odds", "prob_top1", "prob_top3", "ml_score", "recommendation", "is_value_bet", "risque_label", ] else: # bets if not table_exists(conn, "bet_results"): return bad_request("Table bet_results introuvable") rows = conn.execute( f"""SELECT date, race_name, type_pari, horse_name, horse_number, COALESCE(cote, 0) AS cote, mise, resultat, gain FROM bet_results WHERE {date_cond} ORDER BY date DESC LIMIT {EXPORT_MAX_ROWS}""", date_params, ).fetchall() fieldnames = [ "date", "race_name", "type_pari", "horse_name", "horse_number", "cote", "mise", "resultat", "gain", ] writer = csv.DictWriter(output, fieldnames=fieldnames, extrasaction="ignore") writer.writeheader() for row in rows: writer.writerow(dict(row)) filename = f"turf_{export_type}_{date_params[0]}.csv" return Response( output.getvalue(), status=200, mimetype="text/csv", headers={"Content-Disposition": f'attachment; filename="{filename}"'}, ) except Exception as e: return internal_error(str(e)) finally: conn.close()