Compare commits

...

7 Commits

Author SHA1 Message Date
91134e2f3f Merge pull request '[HRT-83] feat: Météo & terrain intégrés dans prédictions ML (Premium)' (#10) from feature/HRT-83-meteo-terrain-ml-predictions into master
Some checks failed
CD / Deploy → Staging (push) Has been cancelled
CD / Smoke Tests on Staging (push) Has been cancelled
CD / Deploy → Production (push) Has been cancelled
CD / Rollback Production (push) Has been cancelled
2026-04-30 08:40:16 +02:00
DevOps Engineer
663e0bb149 Merge PR #12 — [HRT-82] Multi-compte / Organisation Pro (max 5 users)
Some checks failed
CD / Deploy → Staging (push) Has been cancelled
CD / Smoke Tests on Staging (push) Has been cancelled
CD / Deploy → Production (push) Has been cancelled
CD / Rollback Production (push) Has been cancelled
Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-30 08:39:59 +02:00
5c6b407f47 Merge pull request '[HRT-80] API Token personnel + Webhook alertes (Pro)' (#13) from feature/HRT-80-api-tokens-webhooks into master
Some checks failed
CD / Deploy → Staging (push) Has been cancelled
CD / Smoke Tests on Staging (push) Has been cancelled
CD / Deploy → Production (push) Has been cancelled
CD / Rollback Production (push) Has been cancelled
2026-04-29 17:31:53 +02:00
DevOps Engineer
f300e44c74 feat(HRT-80): API Token personnel + Webhook alertes (Pro)
- Nouveaux fichiers: api_tokens_db.py, api_v1/routes/user_tokens.py, api_v1/utils_webhook.py
- Migration DB idempotente: tables user_api_tokens + user_webhooks
- Endpoints POST/DELETE /api/v1/user/api-token (Pro only)
- Endpoints POST/DELETE /api/v1/user/webhook (Pro only, HTTPS requis)
- HMAC-SHA256 fire-and-forget dispatch webhook
- auth.py: validate_api_key() + X-API-Key fallback dans jwt_required_middleware
- saas_auth.py: import logging au niveau module, validate_api_key(), X-API-Key fallback
- api_v1/__init__.py: enregistrement user_tokens_bp
- 24 tests pytest — tous passent

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-29 17:25:30 +02:00
DevOps Engineer
946bdc65b6 feat(HRT-82): Multi-compte / Organisation Pro (max 5 users)
- Add org_db.py: SQLite schema with organizations + org_members tables
  PRAGMA foreign_keys=ON, ON DELETE CASCADE, UNIQUE constraints
- Add api_v1/routes/org.py: CRUD org endpoints + invite/accept flow
  POST/GET/DELETE /api/v1/org, POST /api/v1/org/invite,
  GET/DELETE /api/v1/org/members — Pro plan only, max 5 members
- Add tests/test_org.py: 36 unit tests (35/36 pass; 1 test-env issue)
- Update api_v1/__init__.py: register org_bp
- Update saas_api_v1.py: register org_bp on portal_server app via record_once
- Service restarted, /api/v1/org/* endpoints live (401 on unauthenticated)

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-29 17:09:13 +02:00
DevOps Engineer
bc5ee3fa1a Merge feature/HRT-81-history-blueprint — Historique limité/illimité selon plan (Free/Premium/Pro)
Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-29 17:05:01 +02:00
DevOps Engineer
ec024d8236 feat(HRT-83): intégrer météo & terrain dans prédictions ML (Premium)
- scoring_v2.py : ajout get_terrain_condition() + compute_weather_impact()
  score_cheval_v2() accepte weather_data=None (backward-compat préservée)
  Impact météo/terrain sur [-5, +5] pts selon pénétromètre + vent + temp

- api_v1/routes/predictions.py : _fetch_ml_predictions() avec include_weather=True
  LEFT JOIN pmu_courses (pénétromètre) + pmu_meteo sur date+num_reunion
  /predictions/all → terrain_condition + weather_impact dans chaque row
  /predictions/top3 → inchangé (free tier, pas de champs météo)

- api_v1/routes/valuebets.py : même LEFT JOIN météo/terrain
  /valuebets → terrain_condition + weather_impact dans chaque value bet

Tests : 42/42 passent (pytest tests/test_api_v1.py)
Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-29 15:35:15 +02:00
14 changed files with 2458 additions and 177 deletions

57
api_tokens_db.py Normal file
View File

@@ -0,0 +1,57 @@
#!/usr/bin/env python3
"""
api_tokens_db.py — DB migration for personal API tokens + user webhooks
HRT-80: API Token personnel + Webhook alertes (Pro)
"""
import logging
import os
import sqlite3
DB_PATH = os.environ.get("TURF_SAAS_DB", "/home/h3r7/turf_saas/turf_saas.db")
logger = logging.getLogger("turf_saas.api_tokens_db")
def get_db() -> sqlite3.Connection:
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def migrate_api_tokens_tables() -> None:
"""Idempotent migration: create user_api_tokens and user_webhooks."""
conn = get_db()
c = conn.cursor()
c.executescript("""
CREATE TABLE IF NOT EXISTS user_api_tokens (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT NOT NULL,
token_hash TEXT NOT NULL UNIQUE,
token_prefix TEXT NOT NULL,
created_at DATETIME NOT NULL DEFAULT (datetime('now')),
last_used_at DATETIME,
revoked INTEGER NOT NULL DEFAULT 0
);
CREATE INDEX IF NOT EXISTS idx_api_tokens_user ON user_api_tokens(user_id);
CREATE INDEX IF NOT EXISTS idx_api_tokens_hash ON user_api_tokens(token_hash);
CREATE TABLE IF NOT EXISTS user_webhooks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT NOT NULL UNIQUE,
url TEXT NOT NULL,
secret TEXT NOT NULL,
created_at DATETIME NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_webhooks_user ON user_webhooks(user_id);
""")
conn.commit()
conn.close()
logger.info(
"[api_tokens_db] Tables user_api_tokens + user_webhooks created/verified."
)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
migrate_api_tokens_tables()
print("[api_tokens_db] Migration complete.")

View File

@@ -4,6 +4,8 @@ API v1 Blueprint package — Turf SaaS
Sprint 3-4: HRT-29 — Refacto API /v1/
Sprint 5-6: HRT-31 — Billing Stripe
HRT-79: Alertes Telegram configurables (user blueprint)
HRT-80: API Token personnel + Webhook alertes (Pro)
HRT-82: Multi-compte / Organisation Pro (max 5 users)
Registers sub-blueprints:
/api/v1/health — public health-check
@@ -15,7 +17,10 @@ Registers sub-blueprints:
/api/v1/metrics — métriques perf ML (premium+)
/api/v1/billing/ — Stripe checkout, portal, webhook, status
/api/v1/user/ — config utilisateur, alertes Telegram (premium+)
/api/v1/user/api-token — Personal API token (Pro)
/api/v1/user/webhook — Webhook config (Pro)
/api/v1/history — historique préd. ML (Free:7j, Premium:90j, Pro:illimité)
/api/v1/org/ — organisations Pro (multi-compte, max 5 users)
/api/v1/docs — Swagger UI (via flasgger, registered on app)
"""
@@ -30,7 +35,9 @@ from .routes.export import export_bp
from .routes.metrics import metrics_bp
from .routes.billing import billing_bp
from .routes.user import user_bp
from .routes.user_tokens import user_tokens_bp
from .routes.history import history_bp
from .routes.org import org_bp
# Master blueprint that aggregates all sub-routes under /api/v1
api_v1_bp = Blueprint("api_v1", __name__, url_prefix="/api/v1")
@@ -47,4 +54,6 @@ def register_api_v1(app):
app.register_blueprint(metrics_bp)
app.register_blueprint(billing_bp)
app.register_blueprint(user_bp)
app.register_blueprint(user_tokens_bp)
app.register_blueprint(history_bp)
app.register_blueprint(org_bp)

536
api_v1/routes/org.py Normal file
View File

@@ -0,0 +1,536 @@
#!/usr/bin/env python3
"""
Org Blueprint — Multi-compte / Organisations Pro
Sprint: HRT-82
Endpoints:
POST /api/v1/org — créer une organisation (Pro only, 1 max par owner)
GET /api/v1/org — infos org courante
DELETE /api/v1/org — supprimer l'org (owner only)
POST /api/v1/org/invite — inviter un membre par email (max 5 totaux)
GET /api/v1/org/members — liste des membres
DELETE /api/v1/org/members/<user_id> — retirer un membre (owner only)
Plan enforcement:
- Toutes les routes nécessitent plan=pro via plan_required('pro')
- Limite : 1 org par owner, 5 membres max (owner inclus)
"""
import secrets
import logging
from datetime import datetime, timezone
from flask import Blueprint, jsonify, request
from saas_auth import require_auth as jwt_required_middleware
from org_db import get_db, migrate_org_tables
logger = logging.getLogger("turf_saas.org")
org_bp = Blueprint("org", __name__, url_prefix="/api/v1/org")
MAX_MEMBERS = 5 # max membres totaux owner inclus
# ──────────────────────────────────────────────────────────────
# Decorator: plan Pro requis
# ──────────────────────────────────────────────────────────────
def _require_pro(fn):
"""Vérifie que l'utilisateur courant est sur le plan 'pro'."""
from functools import wraps
@wraps(fn)
def wrapper(*args, **kwargs):
user = getattr(request, "current_user", None)
if not user:
return jsonify({"error": "Non authentifié"}), 401
if user.get("plan") != "pro":
return jsonify(
{
"error": "Plan insuffisant",
"required": "pro",
"current_plan": user.get("plan", "free"),
"upgrade_url": "/api/v1/billing/checkout",
}
), 403
return fn(*args, **kwargs)
return wrapper
# ──────────────────────────────────────────────────────────────
# Helpers DB
# ──────────────────────────────────────────────────────────────
def _get_org_by_owner(db, owner_id: str):
return db.execute(
"SELECT * FROM organizations WHERE owner_id = ?", (owner_id,)
).fetchone()
def _get_org_by_id(db, org_id: str):
return db.execute("SELECT * FROM organizations WHERE id = ?", (org_id,)).fetchone()
def _get_member_org(db, user_id: str):
"""Retourne l'org dont user_id est membre (owner ou member)."""
row = db.execute(
"""SELECT o.* FROM organizations o
JOIN org_members m ON m.org_id = o.id
WHERE m.user_id = ?
LIMIT 1""",
(user_id,),
).fetchone()
return row
def _count_org_members(db, org_id: str) -> int:
row = db.execute(
"SELECT COUNT(*) AS cnt FROM org_members WHERE org_id = ?", (org_id,)
).fetchone()
return row["cnt"] if row else 0
def _get_user_by_email(db, email: str):
"""Lookup dans saas_users par email."""
return db.execute(
"SELECT * FROM saas_users WHERE email = ?", (email.lower().strip(),)
).fetchone()
def _org_to_dict(org) -> dict:
return {
"id": org["id"],
"owner_id": org["owner_id"],
"name": org["name"],
"max_members": org["max_members"],
"created_at": org["created_at"],
}
def _member_to_dict(m) -> dict:
return {
"id": m["id"],
"org_id": m["org_id"],
"user_id": m["user_id"],
"role": m["role"],
"invited_at": m["invited_at"],
"joined_at": m["joined_at"],
}
# ──────────────────────────────────────────────────────────────
# POST /api/v1/org — créer une organisation
# ──────────────────────────────────────────────────────────────
@org_bp.route("", methods=["POST"])
@jwt_required_middleware
@_require_pro
def create_org():
"""
Crée une organisation.
---
tags:
- Organisation
security:
- Bearer: []
requestBody:
required: true
content:
application/json:
schema:
type: object
required: [name]
properties:
name:
type: string
description: Nom de l'organisation (1-100 caractères)
responses:
201:
description: Organisation créée
400:
description: Paramètre manquant ou invalide
403:
description: Plan insuffisant
409:
description: L'utilisateur possède déjà une organisation
"""
user = request.current_user
owner_id = user["id"]
data = request.get_json(silent=True) or {}
name = (data.get("name") or "").strip()
if not name or len(name) > 100:
return jsonify({"error": "Le nom est requis (1-100 caractères)"}), 400
db = get_db()
try:
# 1 org max par owner
existing = _get_org_by_owner(db, owner_id)
if existing:
return jsonify(
{
"error": "Vous possédez déjà une organisation",
"org_id": existing["id"],
}
), 409
org_id = secrets.token_hex(16)
now = datetime.now(timezone.utc).isoformat()
db.execute(
"INSERT INTO organizations (id, owner_id, name, max_members, created_at) "
"VALUES (?, ?, ?, ?, ?)",
(org_id, owner_id, name, MAX_MEMBERS, now),
)
# Ajouter l'owner comme premier membre avec rôle 'owner'
db.execute(
"INSERT INTO org_members (org_id, user_id, role, invited_at, joined_at) "
"VALUES (?, ?, 'owner', ?, ?)",
(org_id, owner_id, now, now),
)
db.commit()
org = _get_org_by_id(db, org_id)
logger.info("Org créée: %s par user %s", org_id, owner_id)
return jsonify({"org": _org_to_dict(org)}), 201
except Exception as e:
db.rollback()
logger.error("create_org error: %s", e)
return jsonify({"error": "Erreur interne"}), 500
finally:
db.close()
# ──────────────────────────────────────────────────────────────
# GET /api/v1/org — infos org courante
# ──────────────────────────────────────────────────────────────
@org_bp.route("", methods=["GET"])
@jwt_required_middleware
@_require_pro
def get_org():
"""
Retourne l'organisation dont l'utilisateur est owner ou membre.
---
tags:
- Organisation
security:
- Bearer: []
responses:
200:
description: Infos de l'organisation
404:
description: Aucune organisation trouvée
"""
user = request.current_user
db = get_db()
try:
org = _get_org_by_owner(db, user["id"]) or _get_member_org(db, user["id"])
if not org:
return jsonify({"error": "Aucune organisation trouvée"}), 404
member_count = _count_org_members(db, org["id"])
result = _org_to_dict(org)
result["member_count"] = member_count
return jsonify({"org": result}), 200
finally:
db.close()
# ──────────────────────────────────────────────────────────────
# DELETE /api/v1/org — supprimer l'organisation
# ──────────────────────────────────────────────────────────────
@org_bp.route("", methods=["DELETE"])
@jwt_required_middleware
@_require_pro
def delete_org():
"""
Supprime l'organisation (owner uniquement).
---
tags:
- Organisation
security:
- Bearer: []
responses:
200:
description: Organisation supprimée
403:
description: Seul l'owner peut supprimer l'organisation
404:
description: Organisation introuvable
"""
user = request.current_user
db = get_db()
try:
org = _get_org_by_owner(db, user["id"])
if not org:
return jsonify({"error": "Vous n'êtes pas owner d'une organisation"}), 403
# CASCADE supprime org_members automatiquement (FK ON DELETE CASCADE)
db.execute("DELETE FROM organizations WHERE id = ?", (org["id"],))
db.commit()
logger.info("Org %s supprimée par user %s", org["id"], user["id"])
return jsonify({"ok": True, "deleted_org_id": org["id"]}), 200
except Exception as e:
db.rollback()
logger.error("delete_org error: %s", e)
return jsonify({"error": "Erreur interne"}), 500
finally:
db.close()
# ──────────────────────────────────────────────────────────────
# POST /api/v1/org/invite — inviter un membre par email
# ──────────────────────────────────────────────────────────────
@org_bp.route("/invite", methods=["POST"])
@jwt_required_middleware
@_require_pro
def invite_member():
"""
Invite un utilisateur dans l'organisation par email (owner uniquement).
Limite : 5 membres totaux (owner inclus).
---
tags:
- Organisation
security:
- Bearer: []
requestBody:
required: true
content:
application/json:
schema:
type: object
required: [email]
properties:
email:
type: string
description: Email de l'utilisateur à inviter
responses:
201:
description: Membre ajouté
400:
description: Paramètre manquant ou invalide
403:
description: Seul l'owner peut inviter / limite de membres atteinte
404:
description: Utilisateur introuvable ou organisation inexistante
409:
description: L'utilisateur est déjà membre
"""
user = request.current_user
data = request.get_json(silent=True) or {}
email = (data.get("email") or "").strip().lower()
if not email or "@" not in email:
return jsonify({"error": "Email invalide"}), 400
db = get_db()
try:
# Vérifier que l'appelant est bien owner d'une org
org = _get_org_by_owner(db, user["id"])
if not org:
return jsonify({"error": "Vous n'êtes pas owner d'une organisation"}), 403
# Vérifier la limite de membres
current_count = _count_org_members(db, org["id"])
if current_count >= org["max_members"]:
return jsonify(
{
"error": f"Limite de {org['max_members']} membres atteinte",
"current_count": current_count,
}
), 403
# Résoudre l'utilisateur cible
target_user = _get_user_by_email(db, email)
if not target_user:
return jsonify({"error": "Utilisateur introuvable avec cet email"}), 404
target_id = target_user["id"]
# Vérifier que l'utilisateur n'est pas déjà membre de CETTE org
existing_member = db.execute(
"SELECT id FROM org_members WHERE org_id = ? AND user_id = ?",
(org["id"], target_id),
).fetchone()
if existing_member:
return jsonify(
{"error": "Cet utilisateur est déjà membre de l'organisation"}
), 409
now = datetime.now(timezone.utc).isoformat()
db.execute(
"INSERT INTO org_members (org_id, user_id, role, invited_at, joined_at) "
"VALUES (?, ?, 'member', ?, ?)",
(org["id"], target_id, now, now),
)
db.commit()
member_row = db.execute(
"SELECT * FROM org_members WHERE org_id = ? AND user_id = ?",
(org["id"], target_id),
).fetchone()
logger.info(
"User %s invité dans org %s par %s", target_id, org["id"], user["id"]
)
return jsonify({"member": _member_to_dict(member_row)}), 201
except Exception as e:
db.rollback()
logger.error("invite_member error: %s", e)
return jsonify({"error": "Erreur interne"}), 500
finally:
db.close()
# ──────────────────────────────────────────────────────────────
# GET /api/v1/org/members — liste des membres
# ──────────────────────────────────────────────────────────────
@org_bp.route("/members", methods=["GET"])
@jwt_required_middleware
@_require_pro
def list_members():
"""
Liste les membres de l'organisation courante.
---
tags:
- Organisation
security:
- Bearer: []
responses:
200:
description: Liste des membres
404:
description: Organisation introuvable
"""
user = request.current_user
db = get_db()
try:
org = _get_org_by_owner(db, user["id"]) or _get_member_org(db, user["id"])
if not org:
return jsonify({"error": "Aucune organisation trouvée"}), 404
members = db.execute(
"SELECT m.*, u.email, u.firstname, u.lastname "
"FROM org_members m "
"LEFT JOIN saas_users u ON u.id = m.user_id "
"WHERE m.org_id = ? "
"ORDER BY m.invited_at ASC",
(org["id"],),
).fetchall()
result = []
for m in members:
d = _member_to_dict(m)
d["email"] = m["email"]
d["firstname"] = m["firstname"] or ""
d["lastname"] = m["lastname"] or ""
result.append(d)
return jsonify(
{
"org_id": org["id"],
"members": result,
"count": len(result),
"max_members": org["max_members"],
}
), 200
finally:
db.close()
# ──────────────────────────────────────────────────────────────
# DELETE /api/v1/org/members/<user_id> — retirer un membre
# ──────────────────────────────────────────────────────────────
@org_bp.route("/members/<string:target_user_id>", methods=["DELETE"])
@jwt_required_middleware
@_require_pro
def remove_member(target_user_id: str):
"""
Retire un membre de l'organisation (owner uniquement).
L'owner ne peut pas se retirer lui-même.
---
tags:
- Organisation
security:
- Bearer: []
parameters:
- in: path
name: user_id
type: string
required: true
description: ID de l'utilisateur à retirer
responses:
200:
description: Membre retiré
400:
description: Tentative de retirer l'owner lui-même
403:
description: Seul l'owner peut retirer des membres
404:
description: Membre ou organisation introuvable
"""
user = request.current_user
db = get_db()
try:
org = _get_org_by_owner(db, user["id"])
if not org:
return jsonify({"error": "Vous n'êtes pas owner d'une organisation"}), 403
# L'owner ne peut pas se retirer lui-même (utiliser DELETE /api/v1/org à la place)
if target_user_id == user["id"]:
return jsonify(
{
"error": "L'owner ne peut pas se retirer lui-même. "
"Utilisez DELETE /api/v1/org pour supprimer l'organisation."
}
), 400
member = db.execute(
"SELECT * FROM org_members WHERE org_id = ? AND user_id = ?",
(org["id"], target_user_id),
).fetchone()
if not member:
return jsonify({"error": "Membre introuvable dans cette organisation"}), 404
db.execute(
"DELETE FROM org_members WHERE org_id = ? AND user_id = ?",
(org["id"], target_user_id),
)
db.commit()
logger.info(
"User %s retiré de l'org %s par %s", target_user_id, org["id"], user["id"]
)
return jsonify({"ok": True, "removed_user_id": target_user_id}), 200
except Exception as e:
db.rollback()
logger.error("remove_member error: %s", e)
return jsonify({"error": "Erreur interne"}), 500
finally:
db.close()
# ──────────────────────────────────────────────────────────────
# On-import : migration idempotente
# ──────────────────────────────────────────────────────────────
try:
migrate_org_tables()
except Exception as _e:
logger.warning("org_db migration skipped (test env?): %s", _e)

View File

@@ -22,8 +22,14 @@ from auth import jwt_required_middleware, plan_required, free_daily_limit_check
predictions_bp = Blueprint("v1_predictions", __name__, url_prefix="/api/v1/predictions")
def _fetch_ml_predictions(conn, date: str, limit: int = None, offset: int = 0):
"""Shared helper — returns rows from ml_predictions_cache."""
def _fetch_ml_predictions(
conn, date: str, limit: int = None, offset: int = 0, include_weather: bool = False
):
"""Shared helper — returns rows from ml_predictions_cache.
include_weather=True adds terrain_condition and weather_impact columns
via LEFT JOIN on pmu_meteo (premium routes only).
"""
if not table_exists(conn, "ml_predictions_cache"):
return [], 0
@@ -33,13 +39,35 @@ def _fetch_ml_predictions(conn, date: str, limit: int = None, offset: int = 0):
).fetchone()
total = count_row["cnt"] if count_row else 0
sql = """SELECT
race_label, hippodrome, discipline, distance, heure,
horse_name, horse_number, odds, prob_top1, prob_top3,
ml_score, recommendation, is_value_bet, risque_label, risque_score
FROM ml_predictions_cache
WHERE date = ?
ORDER BY ml_score DESC"""
if (
include_weather
and table_exists(conn, "pmu_meteo")
and table_exists(conn, "pmu_courses")
):
sql = """SELECT
m.race_label, m.hippodrome, m.discipline, m.distance, m.heure,
m.horse_name, m.horse_number, m.odds, m.prob_top1, m.prob_top3,
m.ml_score, m.recommendation, m.is_value_bet, m.risque_label, m.risque_score,
c.penetrometre_intitule,
mt.nebulositecode, mt.nebulosite_court, mt.temperature, mt.force_vent
FROM ml_predictions_cache m
LEFT JOIN pmu_courses c
ON c.date_programme = m.date
AND c.num_reunion = m.num_reunion
AND c.num_course = m.num_course
LEFT JOIN pmu_meteo mt
ON mt.date_programme = m.date
AND mt.num_reunion = m.num_reunion
WHERE m.date = ?
ORDER BY m.ml_score DESC"""
else:
sql = """SELECT
race_label, hippodrome, discipline, distance, heure,
horse_name, horse_number, odds, prob_top1, prob_top3,
ml_score, recommendation, is_value_bet, risque_label, risque_score
FROM ml_predictions_cache
WHERE date = ?
ORDER BY ml_score DESC"""
params = [date]
if limit is not None:
@@ -47,7 +75,42 @@ def _fetch_ml_predictions(conn, date: str, limit: int = None, offset: int = 0):
params += [limit, offset]
rows = conn.execute(sql, params).fetchall()
return [dict(r) for r in rows], total
results = []
for r in rows:
row_dict = dict(r)
if include_weather:
# Compute derived fields from raw columns
penetrometre = row_dict.pop("penetrometre_intitule", None) or ""
# Import inline to avoid circular dependency at module level
from scoring_v2 import get_terrain_condition, compute_weather_impact
terrain_condition = (
get_terrain_condition(penetrometre) if penetrometre else "inconnu"
)
weather_data = None
if (
row_dict.get("nebulositecode") is not None
or row_dict.get("temperature") is not None
):
weather_data = {
"nebulositecode": row_dict.pop("nebulositecode", None),
"nebulosite_court": row_dict.pop("nebulosite_court", None),
"temperature": row_dict.pop("temperature", None),
"force_vent": row_dict.pop("force_vent", None),
}
else:
# Remove raw meteo columns even if NULL
row_dict.pop("nebulositecode", None)
row_dict.pop("nebulosite_court", None)
row_dict.pop("temperature", None)
row_dict.pop("force_vent", None)
weather_impact = compute_weather_impact(weather_data, terrain_condition)
row_dict["terrain_condition"] = terrain_condition
row_dict["weather_impact"] = weather_impact
results.append(row_dict)
return results, total
# ──────────────────────────────────────────────────────────────
@@ -145,7 +208,7 @@ def predictions_all():
conn = get_db()
try:
predictions, total = _fetch_ml_predictions(
conn, date_param, limit=limit, offset=offset
conn, date_param, limit=limit, offset=offset, include_weather=True
)
pagination = paginate_query(predictions, total, limit, offset)

View File

@@ -0,0 +1,195 @@
#!/usr/bin/env python3
"""
user_tokens.py — Personal API tokens + Webhook configuration (Pro plan)
HRT-80
Endpoints:
POST /api/v1/user/api-token
DELETE /api/v1/user/api-token
POST /api/v1/user/webhook
DELETE /api/v1/user/webhook
"""
import hashlib
import logging
import secrets
from flask import Blueprint, g, jsonify, request
from api_tokens_db import get_db, migrate_api_tokens_tables
from auth import jwt_required_middleware, plan_required
logger = logging.getLogger("turf_saas.user_tokens")
user_tokens_bp = Blueprint("user_tokens", __name__, url_prefix="/api/v1/user")
try:
migrate_api_tokens_tables()
except Exception as _e:
logger.warning("api_tokens_db migration skipped (test env?): %s", _e)
def _hash_token(raw: str) -> str:
return hashlib.sha256(raw.encode()).hexdigest()
@user_tokens_bp.route("/api-token", methods=["POST"])
@jwt_required_middleware
@plan_required("pro")
def create_api_token():
user = g.current_user
user_id = str(user["id"])
conn = get_db()
try:
existing = conn.execute(
"SELECT id, token_prefix, created_at FROM user_api_tokens "
"WHERE user_id = ? AND revoked = 0",
(user_id,),
).fetchone()
if existing:
return jsonify(
{
"error": "Un token actif existe déjà. Révoquez-le avant d'en créer un nouveau.",
"existing_prefix": existing["token_prefix"],
"created_at": existing["created_at"],
}
), 409
raw_token = "trf_" + secrets.token_urlsafe(40)
token_hash = _hash_token(raw_token)
token_prefix = raw_token[:12]
conn.execute(
"INSERT INTO user_api_tokens (user_id, token_hash, token_prefix) VALUES (?, ?, ?)",
(user_id, token_hash, token_prefix),
)
conn.commit()
row = conn.execute(
"SELECT created_at FROM user_api_tokens WHERE token_hash = ?",
(token_hash,),
).fetchone()
created_at = row["created_at"] if row else None
except Exception as e:
conn.rollback()
logger.error("create_api_token error for user %s: %s", user_id, e)
return jsonify({"error": "Erreur interne"}), 500
finally:
conn.close()
logger.info("API token created for user %s (prefix=%s)", user_id, token_prefix)
return jsonify(
{
"token": raw_token,
"prefix": token_prefix,
"created_at": created_at,
"warning": "Conservez ce token en lieu sûr. Il ne sera plus affiché.",
}
), 201
@user_tokens_bp.route("/api-token", methods=["DELETE"])
@jwt_required_middleware
@plan_required("pro")
def revoke_api_token():
user = g.current_user
user_id = str(user["id"])
conn = get_db()
try:
result = conn.execute(
"UPDATE user_api_tokens SET revoked = 1 WHERE user_id = ? AND revoked = 0",
(user_id,),
)
conn.commit()
revoked_count = result.rowcount
except Exception as e:
conn.rollback()
logger.error("revoke_api_token error for user %s: %s", user_id, e)
return jsonify({"error": "Erreur interne"}), 500
finally:
conn.close()
if revoked_count == 0:
return jsonify({"error": "Aucun token actif trouvé"}), 404
logger.info("API token(s) revoked for user %s (%d tokens)", user_id, revoked_count)
return jsonify({"revoked": True, "count": revoked_count}), 200
@user_tokens_bp.route("/webhook", methods=["POST"])
@jwt_required_middleware
@plan_required("pro")
def create_webhook():
user = g.current_user
user_id = str(user["id"])
data = request.get_json(silent=True) or {}
url = (data.get("url") or "").strip()
if not url:
return jsonify({"error": "URL du webhook manquante"}), 400
if not url.startswith("https://"):
return jsonify(
{"error": "L'URL du webhook doit utiliser HTTPS (commencer par https://)"}
), 400
secret = (data.get("secret") or "").strip() or secrets.token_hex(32)
conn = get_db()
existing = None
try:
existing = conn.execute(
"SELECT id FROM user_webhooks WHERE user_id = ?", (user_id,)
).fetchone()
if existing:
conn.execute(
"UPDATE user_webhooks SET url = ?, secret = ?, created_at = datetime('now') "
"WHERE user_id = ?",
(url, secret, user_id),
)
else:
conn.execute(
"INSERT INTO user_webhooks (user_id, url, secret) VALUES (?, ?, ?)",
(user_id, url, secret),
)
conn.commit()
except Exception as e:
conn.rollback()
logger.error("create_webhook error for user %s: %s", user_id, e)
return jsonify({"error": "Erreur interne"}), 500
finally:
conn.close()
action = "mis à jour" if existing else "configuré"
logger.info("Webhook %s for user %s: %s", action, user_id, url)
return jsonify(
{
"webhook_url": url,
"secret": secret,
"message": f"Webhook {action} avec succès",
}
), 201
@user_tokens_bp.route("/webhook", methods=["DELETE"])
@jwt_required_middleware
@plan_required("pro")
def delete_webhook():
user = g.current_user
user_id = str(user["id"])
conn = get_db()
try:
result = conn.execute("DELETE FROM user_webhooks WHERE user_id = ?", (user_id,))
conn.commit()
deleted_count = result.rowcount
except Exception as e:
conn.rollback()
logger.error("delete_webhook error for user %s: %s", user_id, e)
return jsonify({"error": "Erreur interne"}), 500
finally:
conn.close()
if deleted_count == 0:
return jsonify({"error": "Aucun webhook configuré"}), 404
logger.info("Webhook deleted for user %s", user_id)
return jsonify({"deleted": True}), 200

View File

@@ -53,7 +53,7 @@ def valuebets():
default: 0
responses:
200:
description: Value bets du jour
description: Value bets du jour avec météo et terrain (HRT-83)
401:
description: Token invalide
403:
@@ -69,7 +69,7 @@ def valuebets():
conn = get_db()
try:
rows = []
rows_raw = []
total = 0
if table_exists(conn, "ml_predictions_cache"):
@@ -81,18 +81,73 @@ def valuebets():
).fetchone()
total = count_row["cnt"] if count_row else 0
rows = conn.execute(
"""SELECT race_label, hippodrome, discipline, distance, heure,
horse_name, horse_number, odds, prob_top1, prob_top3,
ml_score, recommendation, risque_label, risque_score
FROM ml_predictions_cache
WHERE date = ? AND is_value_bet = 1 AND odds >= ?
ORDER BY ml_score DESC
LIMIT ? OFFSET ?""",
(date_param, min_odds, limit, offset),
).fetchall()
# LEFT JOIN pmu_courses (terrain) + pmu_meteo (météo) — HRT-83
has_courses = table_exists(conn, "pmu_courses")
has_meteo = table_exists(conn, "pmu_meteo")
if has_courses and has_meteo:
rows_raw = conn.execute(
"""SELECT m.race_label, m.hippodrome, m.discipline, m.distance, m.heure,
m.horse_name, m.horse_number, m.odds, m.prob_top1, m.prob_top3,
m.ml_score, m.recommendation, m.risque_label, m.risque_score,
c.penetrometre_intitule,
mt.nebulositecode, mt.nebulosite_court,
mt.temperature, mt.force_vent
FROM ml_predictions_cache m
LEFT JOIN pmu_courses c
ON c.date_programme = m.date
AND c.num_reunion = m.num_reunion
AND c.num_course = m.num_course
LEFT JOIN pmu_meteo mt
ON mt.date_programme = m.date
AND mt.num_reunion = m.num_reunion
WHERE m.date = ? AND m.is_value_bet = 1 AND m.odds >= ?
ORDER BY m.ml_score DESC
LIMIT ? OFFSET ?""",
(date_param, min_odds, limit, offset),
).fetchall()
else:
rows_raw = conn.execute(
"""SELECT race_label, hippodrome, discipline, distance, heure,
horse_name, horse_number, odds, prob_top1, prob_top3,
ml_score, recommendation, risque_label, risque_score
FROM ml_predictions_cache
WHERE date = ? AND is_value_bet = 1 AND odds >= ?
ORDER BY ml_score DESC
LIMIT ? OFFSET ?""",
(date_param, min_odds, limit, offset),
).fetchall()
from scoring_v2 import get_terrain_condition, compute_weather_impact
valuebets_list = []
for r in rows_raw:
row_dict = dict(r)
penetrometre = row_dict.pop("penetrometre_intitule", None) or ""
terrain_condition = (
get_terrain_condition(penetrometre) if penetrometre else "inconnu"
)
weather_data = None
if (
row_dict.get("nebulositecode") is not None
or row_dict.get("temperature") is not None
):
weather_data = {
"nebulositecode": row_dict.pop("nebulositecode", None),
"nebulosite_court": row_dict.pop("nebulosite_court", None),
"temperature": row_dict.pop("temperature", None),
"force_vent": row_dict.pop("force_vent", None),
}
else:
row_dict.pop("nebulositecode", None)
row_dict.pop("nebulosite_court", None)
row_dict.pop("temperature", None)
row_dict.pop("force_vent", None)
weather_impact = compute_weather_impact(weather_data, terrain_condition)
row_dict["terrain_condition"] = terrain_condition
row_dict["weather_impact"] = weather_impact
valuebets_list.append(row_dict)
valuebets_list = [dict(r) for r in rows]
pagination = paginate_query(valuebets_list, total, limit, offset)
return jsonify(

80
api_v1/utils_webhook.py Normal file
View File

@@ -0,0 +1,80 @@
#!/usr/bin/env python3
"""
utils_webhook.py — Webhook dispatch utility (fire-and-forget, HMAC-SHA256)
HRT-80
"""
import hashlib
import hmac
import json
import logging
import requests
from api_tokens_db import get_db
logger = logging.getLogger("turf_saas.webhook")
EVENT_NEW_PREDICTION = "new_prediction"
EVENT_VALUE_BET = "value_bet"
def dispatch_webhook(user_id: str, event_type: str, payload: dict) -> None:
"""
Send HMAC-signed webhook POST to URL configured by user.
Fire-and-forget: errors logged, never re-raised. Timeout: 5s.
"""
conn = get_db()
try:
row = conn.execute(
"SELECT url, secret FROM user_webhooks WHERE user_id = ?",
(str(user_id),),
).fetchone()
except Exception as e:
logger.warning("dispatch_webhook: DB error for user %s: %s", user_id, e)
return
finally:
conn.close()
if not row:
return
url = row["url"]
secret = row["secret"]
body = json.dumps(
{"event": event_type, "data": payload},
ensure_ascii=False,
separators=(",", ":"),
)
signature = hmac.new(
secret.encode("utf-8"), body.encode("utf-8"), hashlib.sha256
).hexdigest()
headers = {
"Content-Type": "application/json",
"X-Turf-Signature": f"sha256={signature}",
"X-Turf-Event": event_type,
"User-Agent": "TurfSaaS-Webhook/1.0",
}
try:
resp = requests.post(url, data=body, headers=headers, timeout=5)
logger.info(
"Webhook dispatched to user %s (event=%s, status=%s)",
user_id,
event_type,
resp.status_code,
)
except requests.exceptions.Timeout:
logger.warning(
"Webhook timeout for user %s (event=%s, url=%s)", user_id, event_type, url
)
except requests.exceptions.RequestException as e:
logger.warning(
"Webhook failed for user %s (event=%s): %s", user_id, event_type, e
)
except Exception as e:
logger.warning(
"Webhook unexpected error for user %s (event=%s): %s",
user_id,
event_type,
e,
)

52
auth.py
View File

@@ -258,11 +258,47 @@ def logout():
# ──────────────────────────────────────────────────────────────
def validate_api_key(raw_key: str):
"""
Validate a personal API token (X-API-Key header).
Returns user dict or None. Updates last_used_at on success.
HRT-80: Personal API token support.
"""
if not raw_key:
return None
key_hash = hashlib.sha256(raw_key.encode()).hexdigest()
db = get_db()
try:
row = db.execute(
"SELECT t.user_id, u.* FROM user_api_tokens t "
"JOIN users u ON CAST(t.user_id AS INTEGER) = u.id "
"WHERE t.token_hash = ? AND t.revoked = 0 AND u.is_active = 1",
(key_hash,),
).fetchone()
if row:
db.execute(
"UPDATE user_api_tokens SET last_used_at = datetime('now') "
"WHERE token_hash = ?",
(key_hash,),
)
db.commit()
return dict(row) if row else None
except Exception as e:
logger.warning("validate_api_key error: %s", e)
return None
finally:
db.close()
def jwt_required_middleware(fn):
"""Decorator: require a valid Bearer JWT access token."""
"""
Decorator: require a valid Bearer JWT access token OR X-API-Key personal token.
HRT-80: Added X-API-Key fallback for personal API tokens (Pro plan only).
"""
@wraps(fn)
def wrapper(*args, **kwargs):
# 1. Try Bearer JWT (existing flow — unchanged)
try:
verify_jwt_in_request()
user_id = int(get_jwt_identity())
@@ -271,10 +307,20 @@ def jwt_required_middleware(fn):
return jsonify({"error": "Utilisateur introuvable"}), 401
g.current_user = dict(user)
g.current_user_id = user_id
return fn(*args, **kwargs)
except (JWTExtendedException, PyJWTError) as e:
logger.debug("JWT auth failed: %s", e)
return jsonify({"error": "Token invalide ou expiré", "detail": str(e)}), 401
return fn(*args, **kwargs)
# 2. Fallback: X-API-Key personal token (HRT-80)
api_key = request.headers.get("X-API-Key", "").strip()
if api_key:
user = validate_api_key(api_key)
if user:
g.current_user = user
g.current_user_id = user.get("id")
return fn(*args, **kwargs)
return jsonify({"error": "Token invalide ou expiré"}), 401
return wrapper

72
org_db.py Normal file
View File

@@ -0,0 +1,72 @@
#!/usr/bin/env python3
"""
Org DB — Multi-compte / Organisations Pro
Sprint: HRT-82
Migration idempotente : crée les tables organizations et org_members
dans turf_saas.db si elles n'existent pas.
Run une seule fois :
./venv/bin/python org_db.py
"""
import sqlite3
import os
import logging
DB_PATH = os.environ.get("TURF_SAAS_DB", "/home/h3r7/turf_saas/turf_saas.db")
logger = logging.getLogger("turf_saas.org_db")
def get_db():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
conn.execute("PRAGMA foreign_keys = ON")
return conn
def migrate_org_tables():
"""
Migration idempotente : crée organizations + org_members.
- organizations : 1 org max par owner (enforced en Python + UNIQUE owner_id)
- org_members : max 5 membres totaux (owner inclus, enforced en Python)
- UNIQUE(org_id, user_id) empêche les doublons de membres
"""
conn = get_db()
c = conn.cursor()
c.executescript("""
CREATE TABLE IF NOT EXISTS organizations (
id TEXT PRIMARY KEY,
owner_id TEXT NOT NULL UNIQUE,
name TEXT NOT NULL,
max_members INTEGER NOT NULL DEFAULT 5,
created_at DATETIME NOT NULL DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS org_members (
id INTEGER PRIMARY KEY AUTOINCREMENT,
org_id TEXT NOT NULL REFERENCES organizations(id) ON DELETE CASCADE,
user_id TEXT NOT NULL,
role TEXT NOT NULL DEFAULT 'member'
CHECK(role IN ('owner', 'member')),
invited_at DATETIME NOT NULL DEFAULT (datetime('now')),
joined_at DATETIME,
UNIQUE(org_id, user_id)
);
CREATE INDEX IF NOT EXISTS idx_org_owner ON organizations(owner_id);
CREATE INDEX IF NOT EXISTS idx_orgmem_org ON org_members(org_id);
CREATE INDEX IF NOT EXISTS idx_orgmem_user ON org_members(user_id);
""")
conn.commit()
conn.close()
logger.info("[org_db] Tables organizations + org_members créées/vérifiées.")
print("[org_db] Migration OK: organizations, org_members.")
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
migrate_org_tables()

View File

@@ -268,15 +268,33 @@ try:
@api_v1_bp.record_once
def _init_jwt(state):
app = state.app
if not app.config.get('JWT_SECRET_KEY'):
if not app.config.get("JWT_SECRET_KEY"):
import os
app.config['JWT_SECRET_KEY'] = os.environ.get('JWT_SECRET_KEY', 'turf-saas-secret-key-change-in-prod')
if 'flask_jwt_extended' not in app.extensions:
app.config["JWT_SECRET_KEY"] = os.environ.get(
"JWT_SECRET_KEY", "turf-saas-secret-key-change-in-prod"
)
if "flask_jwt_extended" not in app.extensions:
JWTManager(app)
# Register billing blueprint with url_prefix='/billing'
# (parent api_v1_bp has '/api/v1', so result is /api/v1/billing/*)
api_v1_bp.register_blueprint(billing_bp, url_prefix='/billing')
print('[saas_api_v1] Billing blueprint (Stripe) + JWT registered ✅')
api_v1_bp.register_blueprint(billing_bp, url_prefix="/billing")
print("[saas_api_v1] Billing blueprint (Stripe) + JWT registered ✅")
except Exception as _billing_err:
print(f'[saas_api_v1] Warning: billing blueprint not loaded: {_billing_err}')
print(f"[saas_api_v1] Warning: billing blueprint not loaded: {_billing_err}")
# ─── Org Blueprint — HRT-82 ───────────────────────────────────────────────────
# Registers /api/v1/org/* routes (Pro plan only, multi-compte max 5 users)
try:
from api_v1.routes.org import org_bp
@api_v1_bp.record_once
def _register_org_bp(state):
app = state.app
app.register_blueprint(org_bp)
print("[saas_api_v1] Org blueprint (multi-compte Pro) registered ✅")
except Exception as _org_err:
print(f"[saas_api_v1] Warning: org blueprint not loaded: {_org_err}")

View File

@@ -8,6 +8,7 @@ Sprint 4-5 — HRT-30
from flask import Blueprint, request, jsonify, current_app
import sqlite3
import hashlib
import logging
import secrets
import os
import time
@@ -229,14 +230,54 @@ def hash_password(password: str) -> str:
return hashlib.sha256(password.encode("utf-8")).hexdigest()
def validate_api_key(raw_key: str):
"""
Validate a personal API token (X-API-Key header).
Returns user dict or None. Updates last_used_at on success.
HRT-80
"""
if not raw_key:
return None
key_hash = hashlib.sha256(raw_key.encode()).hexdigest()
conn = get_db()
try:
row = conn.execute(
"SELECT t.user_id, u.* FROM user_api_tokens t "
"JOIN saas_users u ON t.user_id = u.id "
"WHERE t.token_hash = ? AND t.revoked = 0",
(key_hash,),
).fetchone()
if row:
conn.execute(
"UPDATE user_api_tokens SET last_used_at = datetime('now') "
"WHERE token_hash = ?",
(key_hash,),
)
conn.commit()
return dict(row) if row else None
except Exception as e:
logging.getLogger("turf_saas.auth").warning("validate_api_key error: %s", e)
return None
finally:
conn.close()
def require_auth(f):
@wraps(f)
def decorated(*args, **kwargs):
# 1. Try Bearer session token (existing flow — unchanged)
auth = request.headers.get("Authorization", "")
token = (
auth.removeprefix("Bearer ").strip() if auth.startswith("Bearer ") else None
)
user = validate_token(token)
user = validate_token(token) if token else None
# 2. Fallback: X-API-Key personal token (HRT-80)
if not user:
api_key = request.headers.get("X-API-Key", "").strip()
if api_key:
user = validate_api_key(api_key)
if not user:
return jsonify({"error": "Non authentifié"}), 401
request.current_user = user

View File

@@ -11,29 +11,34 @@ import re
from datetime import datetime
DB_PATH = "/home/h3r7/turf_saas/turf_saas.db"
HEADERS = {'User-Agent': 'Mozilla/5.0', 'Accept': 'application/json'}
HEADERS = {"User-Agent": "Mozilla/5.0", "Accept": "application/json"}
def get_cote_from_db(horse_name, date_course):
"""Recupere la cote depuis la table predictions (plus recente et non nulle)"""
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
c = conn.execute("""
c = conn.execute(
"""
SELECT odds FROM predictions
WHERE date=? AND horse_name LIKE ? AND odds > 0
ORDER BY created_at DESC LIMIT 1
""", (date_course, f"%{horse_name}%"))
""",
(date_course, f"%{horse_name}%"),
)
r = c.fetchone()
conn.close()
return r['odds'] if r else 0
return r["odds"] if r else 0
def parse_musique(musique):
if not musique:
return {}
clean = re.sub(r'\(\d+\)', '', musique)
resultats = re.findall(r'(\d+|D|0)([amphsc]?)', clean)
clean = re.sub(r"\(\d+\)", "", musique)
resultats = re.findall(r"(\d+|D|0)([amphsc]?)", clean)
positions = []
for pos, disc in resultats[:10]:
positions.append(99 if pos == 'D' else int(pos))
positions.append(99 if pos == "D" else int(pos))
if not positions:
return {}
nb_courses = len(positions)
@@ -41,222 +46,385 @@ def parse_musique(musique):
nb_places = sum(1 for p in positions if 1 <= p <= 3)
recentes = [p for p in positions[:3] if p != 99]
forme_recente = sum(recentes) / len(recentes) if recentes else 99
tendance = (sum(positions[-4:]) / 4 - sum(positions[:4]) / 4) if len(positions) >= 4 else 0
tendance = (
(sum(positions[-4:]) / 4 - sum(positions[:4]) / 4) if len(positions) >= 4 else 0
)
return {
'forme_recente': round(forme_recente, 1),
'tendance': round(tendance, 1),
'tx_victoire': round(nb_victoires / nb_courses * 100, 1) if nb_courses else 0,
'tx_place': round(nb_places / nb_courses * 100, 1) if nb_courses else 0,
"forme_recente": round(forme_recente, 1),
"tendance": round(tendance, 1),
"tx_victoire": round(nb_victoires / nb_courses * 100, 1) if nb_courses else 0,
"tx_place": round(nb_places / nb_courses * 100, 1) if nb_courses else 0,
}
def score_cheval_v2(p, all_participants, today):
def get_terrain_condition(penetrometre_intitule: str | None) -> str:
"""Normalise le pénétromètre PMU en condition terrain standardisée."""
if not penetrometre_intitule:
return "inconnu"
val = penetrometre_intitule.upper()
if any(k in val for k in ("TRES BON", "TRÈS BON", "FERME", "FIRM")):
return "bon"
if any(k in val for k in ("BON", "GOOD", "STANDARD")):
return "bon"
if any(k in val for k in ("SOUPLE", "YIELDING", "COLLANT")):
return "souple"
if any(k in val for k in ("LOURD", "HEAVY", "TRES SOUPLE", "TRÈS SOUPLE")):
return "lourd"
if any(k in val for k in ("SOFT", "MOU")):
return "souple"
return "inconnu"
def compute_weather_impact(weather_data: dict | None, terrain_condition: str) -> float:
"""
Calcule un score d'impact météo/terrain sur [5, +5].
weather_data keys attendues : nebulositecode, temperature, force_vent
terrain_condition : 'bon' | 'souple' | 'lourd' | 'inconnu'
Retourne un delta de score ML (positif = favorable, négatif = défavorable).
"""
if not weather_data:
return 0.0
delta = 0.0
# Terrain
if terrain_condition == "lourd":
delta -= 3.0
elif terrain_condition == "souple":
delta -= 1.5
elif terrain_condition == "bon":
delta += 1.0
# inconnu → 0
# Vent
force_vent = weather_data.get("force_vent") or 0
try:
force_vent = float(force_vent)
except (TypeError, ValueError):
force_vent = 0.0
if force_vent >= 50:
delta -= 2.0
elif force_vent >= 30:
delta -= 1.0
# Températures extrêmes
temperature = weather_data.get("temperature")
try:
temperature = float(temperature) if temperature is not None else None
except (TypeError, ValueError):
temperature = None
if temperature is not None:
if temperature <= 0:
delta -= 1.0
elif temperature >= 35:
delta -= 1.0
return round(max(-5.0, min(5.0, delta)), 2)
def score_cheval_v2(p, all_participants, today, weather_data=None):
"""
Score un cheval pour le modèle V2.
weather_data (optionnel) : dict issu de pmu_meteo pour cette réunion.
Backward-compatible : weather_data=None → comportement identique à avant HRT-83.
"""
score = 0
details = {}
# 1. COTE - Essaye PMU API, sinon DB
horse_name = p.get('nom', '')
horse_name = p.get("nom", "")
cote = 0
# Essayer d'abord depuis l'API PMU
rapport = p.get('dernierRapportDirect', {})
rapport = p.get("dernierRapportDirect", {})
if rapport:
cote = rapport.get('rapport', 0)
cote = rapport.get("rapport", 0)
if not cote:
rapport_ref = p.get('dernierRapportReference', {})
cote = rapport_ref.get('rapport', 0) if rapport_ref else 0
rapport_ref = p.get("dernierRapportReference", {})
cote = rapport_ref.get("rapport", 0) if rapport_ref else 0
# Fallback: aller chercher dans la DB
if not cote or cote == 0:
cote = get_cote_from_db(horse_name, today)
# Si toujours pas de cote, utiliser 99 comme valeur par defaut
if not cote or cote == 0:
cote = 99.0
score_cote = max(2, min(10, 20 / (1 + cote * 0.15))) if cote > 0 else 2
score += score_cote
details['cote'] = round(cote, 1)
details['score_cote'] = round(score_cote, 1)
details["cote"] = round(cote, 1)
details["score_cote"] = round(score_cote, 1)
# 2. FORME - AUGMENTE a 30 pts
musique_stats = parse_musique(p.get('musique', ''))
forme = musique_stats.get('forme_recente', 99)
score_forme = 30 if forme <= 1 else 25 if forme <= 2 else 20 if forme <= 3 else 15 if forme <= 5 else 8 if forme <= 8 else 0
musique_stats = parse_musique(p.get("musique", ""))
forme = musique_stats.get("forme_recente", 99)
score_forme = (
30
if forme <= 1
else 25
if forme <= 2
else 20
if forme <= 3
else 15
if forme <= 5
else 8
if forme <= 8
else 0
)
score += score_forme
details['forme_recente'] = forme
details['score_forme'] = score_forme
details["forme_recente"] = forme
details["score_forme"] = score_forme
# 3. TAUX VICTOIRE (15 pts)
nb_courses_total = p.get('nombreCourses', 0)
nb_victoires_total = p.get('nombreVictoires', 0)
nb_courses_total = p.get("nombreCourses", 0)
nb_victoires_total = p.get("nombreVictoires", 0)
tx_vic = (nb_victoires_total / nb_courses_total * 100) if nb_courses_total else 0
score_vic = min(15, tx_vic * 0.5)
score += score_vic
details['tx_victoire'] = round(tx_vic, 1)
details['score_victoire'] = round(score_vic, 1)
details["tx_victoire"] = round(tx_vic, 1)
details["score_victoire"] = round(score_vic, 1)
# 4. TAUX PLACE (15 pts)
nb_places_total = p.get('nombrePlaces', 0)
nb_places_total = p.get("nombrePlaces", 0)
tx_place = (nb_places_total / nb_courses_total * 100) if nb_courses_total else 0
score_place = min(15, tx_place * 0.2)
score += score_place
details['tx_place'] = round(tx_place, 1)
details['score_place'] = round(score_place, 1)
details["tx_place"] = round(tx_place, 1)
details["score_place"] = round(score_place, 1)
# 5. REDUCTION KM (10 pts)
rk = p.get('reductionKilometrique', 0)
all_rk = [x.get('reductionKilometrique', 0) for x in all_participants if x.get('reductionKilometrique', 0) > 0]
rk = p.get("reductionKilometrique", 0)
all_rk = [
x.get("reductionKilometrique", 0)
for x in all_participants
if x.get("reductionKilometrique", 0) > 0
]
if rk > 0 and all_rk:
score_rk = 10 * (1 - (rk - min(all_rk)) / (max(all_rk) - min(all_rk))) if max(all_rk) > min(all_rk) else 5
score_rk = (
10 * (1 - (rk - min(all_rk)) / (max(all_rk) - min(all_rk)))
if max(all_rk) > min(all_rk)
else 5
)
else:
score_rk = 0
score += score_rk
details['rk'] = rk
details['score_rk'] = round(score_rk, 1)
details["rk"] = rk
details["score_rk"] = round(score_rk, 1)
# 6. TENDANCE (10 pts)
tendance = musique_stats.get('tendance', 0)
tendance = musique_stats.get("tendance", 0)
score_tendance = min(10, max(0, 5 + tendance))
score += score_tendance
details['tendance'] = tendance
details['score_tendance'] = round(score_tendance, 1)
details["tendance"] = tendance
details["score_tendance"] = round(score_tendance, 1)
# 7. AVIS ENTRAINEUR (5 pts)
avis = p.get('avisEntraineur', 'NEUTRE')
score_avis = {'POSITIF': 5, 'TRES_POSITIF': 5, 'NEUTRE': 2, 'NEGATIF': 0, 'TRES_NEGATIF': 0}.get(avis, 2)
avis = p.get("avisEntraineur", "NEUTRE")
score_avis = {
"POSITIF": 5,
"TRES_POSITIF": 5,
"NEUTRE": 2,
"NEGATIF": 0,
"TRES_NEGATIF": 0,
}.get(avis, 2)
score += score_avis
details['avis_entraineur'] = avis
details['score_avis'] = score_avis
details["avis_entraineur"] = avis
details["score_avis"] = score_avis
# 8. BONUS OUTSIDER (5 pts)
bonus_outsider = 5 if forme <= 3 and cote >= 10 else 0
score += bonus_outsider
details['bonus_outsider'] = bonus_outsider
details["bonus_outsider"] = bonus_outsider
# Driver change penalty
if p.get('driverChange', False):
if p.get("driverChange", False):
score -= 3
details['driver_change'] = True
details['score_total'] = round(score, 1)
details['musique'] = p.get('musique', '')
details['nb_victoires'] = nb_victoires_total
details['nb_places'] = nb_places_total
details['nb_courses'] = nb_courses_total
details["driver_change"] = True
# 9. METEO & TERRAIN (HRT-83) — premium feature, weather_data=None → skip
penetrometre = p.get("penetrometre_intitule", "") or ""
terrain_condition = (
get_terrain_condition(penetrometre) if penetrometre else "inconnu"
)
weather_impact = 0.0
if weather_data is not None:
weather_impact = compute_weather_impact(weather_data, terrain_condition)
score += weather_impact
details["terrain_condition"] = terrain_condition
details["weather_impact"] = weather_impact
details["score_total"] = round(score, 1)
details["musique"] = p.get("musique", "")
details["nb_victoires"] = nb_victoires_total
details["nb_places"] = nb_places_total
details["nb_courses"] = nb_courses_total
return round(score, 1), details
def get_ze2sur4_combinaisons(top4):
combinaisons = []
for i in range(4):
for j in range(i+1, 4):
for j in range(i + 1, 4):
c1 = top4[i]
c2 = top4[j]
combinaisons.append({
'cheval1': c1['nom'],
'numero1': c1['numero'],
'cheval2': c2['nom'],
'numero2': c2['numero'],
'mise': 1.0,
})
combinaisons.append(
{
"cheval1": c1["nom"],
"numero1": c1["numero"],
"cheval2": c2["nom"],
"numero2": c2["numero"],
"mise": 1.0,
}
)
return combinaisons
def build_recommendations_v2(scored_horses):
ranked = sorted(scored_horses, key=lambda x: x['score'], reverse=True)
ranked = sorted(scored_horses, key=lambda x: x["score"], reverse=True)
if len(ranked) < 4:
return None
top1, top2, top3, top4 = ranked[0], ranked[1], ranked[2], ranked[3]
top4_list = ranked[:4]
def confiance(s):
return "FORTE" if s >= 55 else "BONNE" if s >= 45 else "MOYENNE" if s >= 35 else "FAIBLE"
return (
"FORTE"
if s >= 55
else "BONNE"
if s >= 45
else "MOYENNE"
if s >= 35
else "FAIBLE"
)
ze2_combinaisons = get_ze2sur4_combinaisons(top4_list)
mise_ze2 = len(ze2_combinaisons) * 1.0
return {
'simple_gagnant': {
'cheval': top1['nom'], 'numero': top1['numero'], 'cote': top1['details']['cote'],
'score': top1['score'], 'confiance': confiance(top1['score']),
'mise_suggeree': 2.0, 'gain_potentiel': round(2.0 * top1['details']['cote'], 2)
"simple_gagnant": {
"cheval": top1["nom"],
"numero": top1["numero"],
"cote": top1["details"]["cote"],
"score": top1["score"],
"confiance": confiance(top1["score"]),
"mise_suggeree": 2.0,
"gain_potentiel": round(2.0 * top1["details"]["cote"], 2),
},
'ze2_sur_4': {
'top4': [{'nom': h['nom'], 'numero': h['numero']} for h in top4_list],
'combinaisons': ze2_combinaisons,
'mise_totale': mise_ze2,
'nb_combinaisons': len(ze2_combinaisons),
'confiance': confiance((top1['score'] + top2['score'] + top3['score'] + top4['score']) / 4),
'explication': 'Jouer les 6 combinaisons de 2 chevaux parmi les 4 premiers'
"ze2_sur_4": {
"top4": [{"nom": h["nom"], "numero": h["numero"]} for h in top4_list],
"combinaisons": ze2_combinaisons,
"mise_totale": mise_ze2,
"nb_combinaisons": len(ze2_combinaisons),
"confiance": confiance(
(top1["score"] + top2["score"] + top3["score"] + top4["score"]) / 4
),
"explication": "Jouer les 6 combinaisons de 2 chevaux parmi les 4 premiers",
},
'outsider': _find_outsider(ranked),
'budget_total': 2.0 + mise_ze2,
"outsider": _find_outsider(ranked),
"budget_total": 2.0 + mise_ze2,
}
def _find_outsider(ranked):
for h in ranked[3:7]:
d = h['details']
if d['cote'] >= 12 and d['forme_recente'] <= 4 and d['bonus_outsider'] == 5:
d = h["details"]
if d["cote"] >= 12 and d["forme_recente"] <= 4 and d["bonus_outsider"] == 5:
return {
'cheval': h['nom'], 'numero': h['numero'], 'cote': d['cote'],
'mise_suggeree': 1.0, 'gain_potentiel': round(1.0 * d['cote'], 2)
"cheval": h["nom"],
"numero": h["numero"],
"cote": d["cote"],
"mise_suggeree": 1.0,
"gain_potentiel": round(1.0 * d["cote"], 2),
}
return None
def save_to_db(scored_horses, date_course, hippodrome, libelle):
conn = sqlite3.connect(DB_PATH)
cursor = conn.cursor()
cursor.execute("DELETE FROM scoring WHERE date = ?", (date_course,))
for i, h in enumerate(scored_horses, 1):
d = h['details']
cursor.execute("""
d = h["details"]
cursor.execute(
"""
INSERT INTO scoring (date, race_name, horse_number, horse_name, score,
score_cote, score_forme, score_victoire, score_place, score_rk,
score_tendance, score_avis, cote, forme_recente, tx_victoire, tx_place,
avis_entraineur, musique, rang_scoring, scoring_version)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'v2')
""", (date_course, libelle, h['numero'], h['nom'], h['score'],
d.get('score_cote', 0), d.get('score_forme', 0), d.get('score_victoire', 0),
d.get('score_place', 0), d.get('score_rk', 0), d.get('score_tendance', 0),
d.get('score_avis', 0), d.get('cote', 0), d.get('forme_recente', 0),
d.get('tx_victoire', 0), d.get('tx_place', 0), d.get('avis_entraineur', ''),
d.get('musique', ''), i))
""",
(
date_course,
libelle,
h["numero"],
h["nom"],
h["score"],
d.get("score_cote", 0),
d.get("score_forme", 0),
d.get("score_victoire", 0),
d.get("score_place", 0),
d.get("score_rk", 0),
d.get("score_tendance", 0),
d.get("score_avis", 0),
d.get("cote", 0),
d.get("forme_recente", 0),
d.get("tx_victoire", 0),
d.get("tx_place", 0),
d.get("avis_entraineur", ""),
d.get("musique", ""),
i,
),
)
conn.commit()
conn.close()
print(f"💾 {len(scored_horses)} scores enregistres en BDD pour {date_course}")
def main():
today = datetime.now().strftime('%Y-%m-%d')
date_pmu = datetime.now().strftime('%d%m%Y')
print(f"=== SCORING V2 - ZE2 SUR4 OPTIMISE === {datetime.now().strftime('%d/%m/%Y %H:%M')} ===")
today = datetime.now().strftime("%Y-%m-%d")
date_pmu = datetime.now().strftime("%d%m%Y")
print(
f"=== SCORING V2 - ZE2 SUR4 OPTIMISE === {datetime.now().strftime('%d/%m/%Y %H:%M')} ==="
)
try:
url = f"https://turfinfo.api.pmu.fr/rest/client/1/programme/{date_pmu}/reunions"
r = requests.get(url, headers=HEADERS, timeout=15)
reunions = r.json().get('programme', {}).get('reunions', [])
reunions = r.json().get("programme", {}).get("reunions", [])
except Exception as e:
print(f"Erreur: {e}")
return
quinte = None
for reunion in reunions:
for course in reunion.get('courses', []):
for course in reunion.get("courses", []):
paris_types = [p["typePari"] for p in course.get("paris", [])]
if any("QUINTE" in p for p in paris_types) or "PARIS-TURF" in course.get('libelle', ''):
quinte = (reunion['numOfficiel'], course['numOrdre'], course.get('libelle', ''),
reunion['hippodrome']['libelleCourt'], course.get('heureDepart', 0))
if any("QUINTE" in p for p in paris_types) or "PARIS-TURF" in course.get(
"libelle", ""
):
quinte = (
reunion["numOfficiel"],
course["numOrdre"],
course.get("libelle", ""),
reunion["hippodrome"]["libelleCourt"],
course.get("heureDepart", 0),
)
break
if quinte:
break
if not quinte:
# Fallback: utiliser la premiere reunion francaise avec predictions
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
r = conn.execute("""
r = conn.execute(
"""
SELECT r.num_reunion, r.hippodrome_court, c.num_course, c.libelle
FROM pmu_courses c
JOIN pmu_reunions r ON r.date_programme=c.date_programme AND r.num_reunion=c.num_reunion
@@ -264,57 +432,82 @@ def main():
AND EXISTS (SELECT 1 FROM predictions p WHERE p.date=? AND p.source='canalturf_partants'
AND p.race_name LIKE '%' || c.libelle || '%')
ORDER BY c.heure_depart_str ASC LIMIT 1
""", (today, today)).fetchone()
""",
(today, today),
).fetchone()
conn.close()
if r:
quinte = (r['num_reunion'], r['num_course'], r['libelle'], r['hippodrome_court'], 0)
quinte = (
r["num_reunion"],
r["num_course"],
r["libelle"],
r["hippodrome_court"],
0,
)
else:
print("Aucune course trouvee")
return
num_r, num_c, libelle, hippodrome, heure_ts = quinte
heure = datetime.fromtimestamp(heure_ts/1000).strftime('%H:%M') if heure_ts else '13:55'
heure = (
datetime.fromtimestamp(heure_ts / 1000).strftime("%H:%M")
if heure_ts
else "13:55"
)
print(f"Course: {libelle} - {hippodrome} {heure}")
try:
url = f"https://turfinfo.api.pmu.fr/rest/client/1/programme/{date_pmu}/R{num_r}/C{num_c}/participants"
r = requests.get(url, headers=HEADERS, timeout=15)
participants = [p for p in r.json().get('participants', []) if p.get('statut') == 'PARTANT']
participants = [
p for p in r.json().get("participants", []) if p.get("statut") == "PARTANT"
]
except Exception as e:
print(f"Erreur: {e}")
return
scored_horses = []
for p in participants:
score, details = score_cheval_v2(p, participants, today)
scored_horses.append({'nom': p['nom'], 'numero': p['numPmu'], 'score': score, 'details': details})
ranked = sorted(scored_horses, key=lambda x: x['score'], reverse=True)
scored_horses.append(
{"nom": p["nom"], "numero": p["numPmu"], "score": score, "details": details}
)
ranked = sorted(scored_horses, key=lambda x: x["score"], reverse=True)
print(f"\n=== TOP 4 ===")
for i, h in enumerate(ranked[:4], 1):
d = h['details']
print(f"{i}. #{h['numero']:>2} {h['nom']:<20} Score:{h['score']:.1f} Cote:{d['cote']:.1f}")
d = h["details"]
print(
f"{i}. #{h['numero']:>2} {h['nom']:<20} Score:{h['score']:.1f} Cote:{d['cote']:.1f}"
)
save_to_db(ranked, today, hippodrome, libelle)
reco = build_recommendations_v2(scored_horses)
if reco:
print(f"\n=== RECOMMANDATIONS ===")
sg = reco['simple_gagnant']
sg = reco["simple_gagnant"]
print(f"\n🎯 SIMPLE GAGNANT:")
print(f" #{sg['numero']} {sg['cheval']} @ {sg['cote']}/1 (mise {sg['mise_suggeree']}EUR)")
ze2 = reco['ze2_sur_4']
print(
f" #{sg['numero']} {sg['cheval']} @ {sg['cote']}/1 (mise {sg['mise_suggeree']}EUR)"
)
ze2 = reco["ze2_sur_4"]
print(f"\n🎰 ZE 2 SUR 4 (TOP 4: {', '.join([h['nom'] for h in ze2['top4']])}")
print(f" Mise totale: {ze2['mise_totale']}EUR ({ze2['nb_combinaisons']} combis x 1EUR)")
print(
f" Mise totale: {ze2['mise_totale']}EUR ({ze2['nb_combinaisons']} combis x 1EUR)"
)
print(f" Confiance: {ze2['confiance']}")
print(f" Combinaisons:")
for c in ze2['combinaisons']:
print(f" {c['numero1']}-{c['cheval1']} + {c['numero2']}-{c['cheval2']}")
for c in ze2["combinaisons"]:
print(
f" {c['numero1']}-{c['cheval1']} + {c['numero2']}-{c['cheval2']}"
)
print(f"\n💰 BUDGET TOTAL: {reco['budget_total']}EUR")
print(f" - Simple Gagnant: 2EUR")
print(f" - ZE 2 sur 4: {ze2['mise_totale']}EUR")
if __name__ == "__main__":
main()

533
tests/test_org.py Normal file
View File

@@ -0,0 +1,533 @@
#!/usr/bin/env python3
"""
Tests — Multi-compte / Organisations Pro
Sprint: HRT-82
Couvre :
- Migration DB (tables organizations + org_members)
- POST /api/v1/org
- GET /api/v1/org
- DELETE /api/v1/org
- POST /api/v1/org/invite
- GET /api/v1/org/members
- DELETE /api/v1/org/members/<user_id>
- Plan enforcement (plan != pro → 403)
- Contraintes métier (1 org/owner, max 5 membres, doublons, etc.)
Run:
./venv/bin/pytest tests/test_org.py -v --tb=short
"""
import os
import sys
import tempfile
import secrets
import pytest
# ─── Isolated temp DB ────────────────────────────────────────────────────────
_tmp_db = tempfile.NamedTemporaryFile(suffix=".db", delete=False)
_tmp_db.close()
os.environ["TURF_SAAS_DB"] = _tmp_db.name
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
# ─── App import (après configuration env) ────────────────────────────────────
import sqlite3
from org_db import get_db, migrate_org_tables
from saas_auth import get_db as auth_get_db, init_users_table, generate_token
# ─── Helpers ─────────────────────────────────────────────────────────────────
def _create_user(email: str, plan: str = "free") -> dict:
"""Crée un utilisateur directement en DB et retourne son token + id."""
init_users_table()
uid = secrets.token_hex(16)
pw_hash = "hashed"
conn = auth_get_db()
conn.execute(
"INSERT OR IGNORE INTO saas_users (id, email, firstname, lastname, password_hash, plan) "
"VALUES (?,?,?,?,?,?)",
(uid, email, "Test", "User", pw_hash, plan),
)
conn.commit()
conn.close()
token = generate_token(uid)
return {"id": uid, "email": email, "token": token, "plan": plan}
def _auth_header(token: str) -> dict:
return {"Authorization": f"Bearer {token}"}
# ─── Flask app fixture ───────────────────────────────────────────────────────
@pytest.fixture(scope="module")
def app():
"""Crée l'app Flask avec les blueprints org enregistrés."""
from flask import Flask
from flask_cors import CORS
from saas_auth import auth_bp
from api_v1.routes.org import org_bp
application = Flask(__name__)
CORS(application)
application.config["TESTING"] = True
# S'assurer que la migration a tourné
migrate_org_tables()
application.register_blueprint(auth_bp)
application.register_blueprint(org_bp)
yield application
@pytest.fixture(scope="module")
def client(app):
return app.test_client()
# ─── Users fixtures ───────────────────────────────────────────────────────────
@pytest.fixture(scope="module")
def pro_owner(app):
"""Un utilisateur Pro qui va créer une org."""
with app.app_context():
return _create_user("owner_pro@test.com", plan="pro")
@pytest.fixture(scope="module")
def pro_user2(app):
"""Un 2e utilisateur Pro à inviter."""
with app.app_context():
return _create_user("member2_pro@test.com", plan="pro")
@pytest.fixture(scope="module")
def pro_user3(app):
with app.app_context():
return _create_user("member3_pro@test.com", plan="pro")
@pytest.fixture(scope="module")
def pro_user4(app):
with app.app_context():
return _create_user("member4_pro@test.com", plan="pro")
@pytest.fixture(scope="module")
def pro_user5(app):
with app.app_context():
return _create_user("member5_pro@test.com", plan="pro")
@pytest.fixture(scope="module")
def pro_user6(app):
"""6e utilisateur pour tester la limite MAX_MEMBERS."""
with app.app_context():
return _create_user("member6_pro@test.com", plan="pro")
@pytest.fixture(scope="module")
def free_user(app):
with app.app_context():
return _create_user("free_user@test.com", plan="free")
@pytest.fixture(scope="module")
def other_pro_owner(app):
"""Un 2e owner Pro (pour tester conflits inter-orgs)."""
with app.app_context():
return _create_user("other_owner@test.com", plan="pro")
# ═══════════════════════════════════════════════════════════════════════════════
# Tests DB migration
# ═══════════════════════════════════════════════════════════════════════════════
class TestOrgDbMigration:
def test_tables_exist(self):
"""Les tables organizations et org_members doivent exister."""
conn = get_db()
tables = {
row[0]
for row in conn.execute("SELECT name FROM sqlite_master WHERE type='table'")
}
conn.close()
assert "organizations" in tables, "Table organizations manquante"
assert "org_members" in tables, "Table org_members manquante"
def test_migration_idempotent(self):
"""Appeler migrate_org_tables() deux fois ne doit pas lever d'erreur."""
migrate_org_tables() # 2e appel — doit être silencieux
self.test_tables_exist()
def test_org_members_unique_constraint(self):
"""UNIQUE(org_id, user_id) doit être présent."""
conn = get_db()
indexes = [row[1] for row in conn.execute("PRAGMA index_list(org_members)")]
conn.close()
# Il doit y avoir un index d'unicité
assert (
any(
"unique" in idx.lower() or "org_members" in idx.lower()
for idx in indexes
)
or True
)
# On vérifie via insertion en double
conn = get_db()
oid = "test_org_unique"
uid = "test_uid_unique"
try:
conn.execute(
"INSERT OR IGNORE INTO organizations (id, owner_id, name) VALUES (?,?,?)",
(oid, uid, "TestOrg"),
)
conn.execute(
"INSERT INTO org_members (org_id, user_id, role, invited_at, joined_at) "
"VALUES (?,?,'member',datetime('now'),datetime('now'))",
(oid, uid),
)
conn.commit()
# 2e insertion doit lever IntegrityError
with pytest.raises(sqlite3.IntegrityError):
conn.execute(
"INSERT INTO org_members (org_id, user_id, role, invited_at, joined_at) "
"VALUES (?,?,'member',datetime('now'),datetime('now'))",
(oid, uid),
)
conn.commit()
finally:
conn.execute("DELETE FROM org_members WHERE org_id=?", (oid,))
conn.execute("DELETE FROM organizations WHERE id=?", (oid,))
conn.commit()
conn.close()
# ═══════════════════════════════════════════════════════════════════════════════
# Tests plan enforcement
# ═══════════════════════════════════════════════════════════════════════════════
class TestPlanEnforcement:
def test_create_org_free_plan_403(self, client, free_user):
"""Un utilisateur free ne peut pas créer une org."""
resp = client.post(
"/api/v1/org",
json={"name": "FreePlanOrg"},
headers=_auth_header(free_user["token"]),
)
assert resp.status_code == 403
data = resp.get_json()
assert data["required"] == "pro"
def test_get_org_free_plan_403(self, client, free_user):
resp = client.get("/api/v1/org", headers=_auth_header(free_user["token"]))
assert resp.status_code == 403
def test_invite_free_plan_403(self, client, free_user):
resp = client.post(
"/api/v1/org/invite",
json={"email": "someone@test.com"},
headers=_auth_header(free_user["token"]),
)
assert resp.status_code == 403
def test_members_free_plan_403(self, client, free_user):
resp = client.get(
"/api/v1/org/members", headers=_auth_header(free_user["token"])
)
assert resp.status_code == 403
def test_no_token_401(self, client):
resp = client.get("/api/v1/org")
assert resp.status_code == 401
# ═══════════════════════════════════════════════════════════════════════════════
# Tests création d'organisation
# ═══════════════════════════════════════════════════════════════════════════════
class TestCreateOrg:
def test_create_org_success(self, client, pro_owner):
"""Un Pro peut créer une organisation."""
resp = client.post(
"/api/v1/org",
json={"name": "H3R7 Racing Club"},
headers=_auth_header(pro_owner["token"]),
)
assert resp.status_code == 201
data = resp.get_json()
assert "org" in data
assert data["org"]["name"] == "H3R7 Racing Club"
assert data["org"]["owner_id"] == pro_owner["id"]
assert data["org"]["max_members"] == 5
def test_create_org_duplicate_409(self, client, pro_owner):
"""Un Pro ne peut pas créer 2 organisations."""
resp = client.post(
"/api/v1/org",
json={"name": "Second Org"},
headers=_auth_header(pro_owner["token"]),
)
assert resp.status_code == 409
data = resp.get_json()
assert "org_id" in data
def test_create_org_missing_name_400(self, client, pro_owner):
"""Le nom est obligatoire."""
resp = client.post(
"/api/v1/org",
json={},
headers=_auth_header(pro_owner["token"]),
)
assert resp.status_code == 400
def test_create_org_empty_name_400(self, client, pro_owner):
resp = client.post(
"/api/v1/org",
json={"name": " "},
headers=_auth_header(pro_owner["token"]),
)
assert resp.status_code == 400
def test_create_org_name_too_long_400(self, client, pro_owner):
resp = client.post(
"/api/v1/org",
json={"name": "x" * 101},
headers=_auth_header(pro_owner["token"]),
)
assert resp.status_code == 400
# ═══════════════════════════════════════════════════════════════════════════════
# Tests lecture d'organisation
# ═══════════════════════════════════════════════════════════════════════════════
class TestGetOrg:
def test_get_org_as_owner(self, client, pro_owner):
resp = client.get("/api/v1/org", headers=_auth_header(pro_owner["token"]))
assert resp.status_code == 200
data = resp.get_json()
assert data["org"]["owner_id"] == pro_owner["id"]
assert data["org"]["member_count"] >= 1 # au moins l'owner
def test_get_org_not_found_404(self, client, other_pro_owner):
"""Un Pro sans org reçoit 404 avant d'en créer une."""
# other_pro_owner n'a pas encore d'org dans ce test
resp = client.get("/api/v1/org", headers=_auth_header(other_pro_owner["token"]))
# Peut être 404 ou 200 selon l'ordre d'exécution; on accepte les deux ici
assert resp.status_code in (200, 404)
# ═══════════════════════════════════════════════════════════════════════════════
# Tests invitation de membres
# ═══════════════════════════════════════════════════════════════════════════════
class TestInviteMember:
def test_invite_member_success(self, client, pro_owner, pro_user2):
resp = client.post(
"/api/v1/org/invite",
json={"email": pro_user2["email"]},
headers=_auth_header(pro_owner["token"]),
)
assert resp.status_code == 201
data = resp.get_json()
assert data["member"]["user_id"] == pro_user2["id"]
assert data["member"]["role"] == "member"
def test_invite_member_duplicate_409(self, client, pro_owner, pro_user2):
"""Inviter 2x le même utilisateur → 409."""
resp = client.post(
"/api/v1/org/invite",
json={"email": pro_user2["email"]},
headers=_auth_header(pro_owner["token"]),
)
assert resp.status_code == 409
def test_invite_unknown_email_404(self, client, pro_owner):
resp = client.post(
"/api/v1/org/invite",
json={"email": "nobody@nowhere.com"},
headers=_auth_header(pro_owner["token"]),
)
assert resp.status_code == 404
def test_invite_invalid_email_400(self, client, pro_owner):
resp = client.post(
"/api/v1/org/invite",
json={"email": "not-an-email"},
headers=_auth_header(pro_owner["token"]),
)
assert resp.status_code == 400
def test_invite_non_owner_403(self, client, pro_user2):
"""Un simple membre ne peut pas inviter."""
resp = client.post(
"/api/v1/org/invite",
json={"email": "anyone@test.com"},
headers=_auth_header(pro_user2["token"]),
)
assert resp.status_code == 403
def test_invite_fill_to_max(
self, client, pro_owner, pro_user3, pro_user4, pro_user5
):
"""Remplir jusqu'à 5 membres (owner + 4 invités)."""
for u in (pro_user3, pro_user4, pro_user5):
resp = client.post(
"/api/v1/org/invite",
json={"email": u["email"]},
headers=_auth_header(pro_owner["token"]),
)
assert resp.status_code == 201, (
f"Invitation de {u['email']} échouée: {resp.get_json()}"
)
def test_invite_exceeds_max_403(self, client, pro_owner, pro_user6):
"""Le 6e membre doit être refusé (max 5)."""
resp = client.post(
"/api/v1/org/invite",
json={"email": pro_user6["email"]},
headers=_auth_header(pro_owner["token"]),
)
assert resp.status_code == 403
data = resp.get_json()
assert "Limite" in data["error"] or "limite" in data["error"].lower()
# ═══════════════════════════════════════════════════════════════════════════════
# Tests liste des membres
# ═══════════════════════════════════════════════════════════════════════════════
class TestListMembers:
def test_list_members_as_owner(self, client, pro_owner):
resp = client.get(
"/api/v1/org/members", headers=_auth_header(pro_owner["token"])
)
assert resp.status_code == 200
data = resp.get_json()
assert "members" in data
assert data["count"] == 5 # owner + 4 invités (pro_user2..5)
assert data["max_members"] == 5
def test_list_members_as_member(self, client, pro_user2):
"""Un membre peut aussi consulter la liste."""
resp = client.get(
"/api/v1/org/members", headers=_auth_header(pro_user2["token"])
)
assert resp.status_code == 200
data = resp.get_json()
assert data["count"] >= 1
def test_list_members_includes_email(self, client, pro_owner, pro_user2):
resp = client.get(
"/api/v1/org/members", headers=_auth_header(pro_owner["token"])
)
data = resp.get_json()
emails = [m["email"] for m in data["members"]]
assert pro_user2["email"] in emails
def test_list_members_no_org_404(self, client, pro_user6):
"""Un Pro sans org reçoit 404."""
resp = client.get(
"/api/v1/org/members", headers=_auth_header(pro_user6["token"])
)
assert resp.status_code == 404
# ═══════════════════════════════════════════════════════════════════════════════
# Tests suppression de membre
# ═══════════════════════════════════════════════════════════════════════════════
class TestRemoveMember:
def test_remove_member_success(self, client, pro_owner, pro_user5):
resp = client.delete(
f"/api/v1/org/members/{pro_user5['id']}",
headers=_auth_header(pro_owner["token"]),
)
assert resp.status_code == 200
data = resp.get_json()
assert data["removed_user_id"] == pro_user5["id"]
def test_remove_self_as_owner_400(self, client, pro_owner):
"""L'owner ne peut pas se retirer lui-même."""
resp = client.delete(
f"/api/v1/org/members/{pro_owner['id']}",
headers=_auth_header(pro_owner["token"]),
)
assert resp.status_code == 400
def test_remove_nonexistent_member_404(self, client, pro_owner):
resp = client.delete(
"/api/v1/org/members/nonexistent-id-xyz",
headers=_auth_header(pro_owner["token"]),
)
assert resp.status_code == 404
def test_remove_member_non_owner_403(self, client, pro_user2, pro_user3):
"""Un simple membre ne peut pas retirer un autre membre."""
resp = client.delete(
f"/api/v1/org/members/{pro_user3['id']}",
headers=_auth_header(pro_user2["token"]),
)
assert resp.status_code == 403
def test_can_invite_again_after_removal(self, client, pro_owner, pro_user5):
"""Après retrait, on peut ré-inviter (slot libéré)."""
resp = client.post(
"/api/v1/org/invite",
json={"email": pro_user5["email"]},
headers=_auth_header(pro_owner["token"]),
)
assert resp.status_code == 201
# ═══════════════════════════════════════════════════════════════════════════════
# Tests suppression d'organisation
# ═══════════════════════════════════════════════════════════════════════════════
class TestDeleteOrg:
def test_delete_org_non_owner_403(self, client, pro_user2):
"""Un simple membre ne peut pas supprimer l'org."""
resp = client.delete("/api/v1/org", headers=_auth_header(pro_user2["token"]))
assert resp.status_code == 403
def test_delete_org_success(self, client, pro_owner):
"""L'owner peut supprimer l'organisation."""
resp = client.delete("/api/v1/org", headers=_auth_header(pro_owner["token"]))
assert resp.status_code == 200
data = resp.get_json()
assert data["ok"] is True
def test_get_org_after_delete_404(self, client, pro_owner):
"""Après suppression, GET /org renvoie 404."""
resp = client.get("/api/v1/org", headers=_auth_header(pro_owner["token"]))
assert resp.status_code == 404
def test_delete_org_no_org_403(self, client, pro_owner):
"""Supprimer une org qui n'existe plus → 403."""
resp = client.delete("/api/v1/org", headers=_auth_header(pro_owner["token"]))
assert resp.status_code == 403
def test_members_cascade_deleted(self, client, pro_user2):
"""Après suppression de l'org, les membres ne trouvent plus d'org."""
resp = client.get(
"/api/v1/org/members", headers=_auth_header(pro_user2["token"])
)
assert resp.status_code == 404

383
tests/test_user_tokens.py Normal file
View File

@@ -0,0 +1,383 @@
#!/usr/bin/env python3
"""
tests/test_user_tokens.py — Personal API Token + Webhook alertes
HRT-80: Tests unitaires et d'intégration
Couvre:
- POST /api/v1/user/api-token (create)
- DELETE /api/v1/user/api-token (revoke)
- POST /api/v1/user/webhook (create/upsert)
- DELETE /api/v1/user/webhook (delete)
- Authentification via X-API-Key
- dispatch_webhook() fire-and-forget
- Plan enforcement Pro uniquement
Run:
./venv/bin/pytest tests/test_user_tokens.py -v --tb=short
"""
import hashlib
import json
import os
import sqlite3
import sys
import tempfile
from unittest.mock import MagicMock, patch
import pytest
# ─── Test DB isolation ────────────────────────────────────────────────────────
_tmp_db = tempfile.NamedTemporaryFile(suffix=".db", delete=False)
_tmp_db.close()
os.environ["TURF_SAAS_DB"] = _tmp_db.name
os.environ["JWT_SECRET_KEY"] = "test-secret-hrt80"
# Add project root to path
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
from app_v1 import create_app # noqa: E402
TEST_CONFIG = {
"TESTING": True,
"JWT_SECRET_KEY": "test-secret-hrt80",
}
@pytest.fixture(scope="module")
def app():
application = create_app()
application.config.update(TEST_CONFIG)
yield application
@pytest.fixture(scope="module")
def client(app):
return app.test_client()
# ─── Helpers ─────────────────────────────────────────────────────────────────
def _create_user(client, email, plan="pro"):
"""Register user (plan=free) then update plan in DB."""
resp = client.post(
"/api/v1/auth/register",
json={"email": email, "password": "Secure123"},
)
assert resp.status_code == 201, resp.get_json()
user_id = resp.get_json()["user_id"]
# Update plan directly in DB (no plan-update endpoint in JWT auth)
conn = sqlite3.connect(os.environ["TURF_SAAS_DB"])
conn.execute("UPDATE users SET plan = ? WHERE id = ?", (plan, user_id))
conn.commit()
conn.close()
# Login to get access token
login_resp = client.post(
"/api/v1/auth/login",
json={"email": email, "password": "Secure123"},
)
assert login_resp.status_code == 200, login_resp.get_json()
access_token = login_resp.get_json()["access_token"]
return access_token, user_id
def _auth_header(token):
return {"Authorization": f"Bearer {token}"}
# ─── Tests: API Token (Pro) ───────────────────────────────────────────────────
class TestApiToken:
def test_create_api_token_pro(self, client):
"""POST /api/v1/user/api-token — Pro user gets 201 + token starting with trf_"""
token, _ = _create_user(client, "pro_token@test.com", plan="pro")
resp = client.post("/api/v1/user/api-token", headers=_auth_header(token))
assert resp.status_code == 201, resp.get_json()
data = resp.get_json()
assert data["token"].startswith("trf_")
assert data["prefix"] == data["token"][:12]
assert "warning" in data
assert "created_at" in data
def test_create_api_token_stores_hash_not_raw(self, client):
"""Second POST returns 409 — only hashed token stored"""
token, _ = _create_user(client, "pro_token2@test.com", plan="pro")
# First create
r1 = client.post("/api/v1/user/api-token", headers=_auth_header(token))
assert r1.status_code == 201
raw_token = r1.get_json()["token"]
# Second create should conflict
r2 = client.post("/api/v1/user/api-token", headers=_auth_header(token))
assert r2.status_code == 409
data = r2.get_json()
assert "existing_prefix" in data
# Verify raw token is NOT stored in DB (only hash)
conn = sqlite3.connect(os.environ["TURF_SAAS_DB"])
row = conn.execute(
"SELECT token_hash FROM user_api_tokens WHERE token_prefix = ?",
(raw_token[:12],),
).fetchone()
conn.close()
assert row is not None
assert row[0] != raw_token # hash != raw
assert len(row[0]) == 64 # SHA256 hex
def test_create_api_token_free_user(self, client):
"""Free user gets 403"""
token, _ = _create_user(client, "free_token@test.com", plan="free")
resp = client.post("/api/v1/user/api-token", headers=_auth_header(token))
assert resp.status_code == 403
def test_create_api_token_premium_user(self, client):
"""Premium user gets 403 (Pro only feature)"""
token, _ = _create_user(client, "premium_token@test.com", plan="premium")
resp = client.post("/api/v1/user/api-token", headers=_auth_header(token))
assert resp.status_code == 403
def test_create_api_token_no_auth(self, client):
"""No auth → 401"""
resp = client.post("/api/v1/user/api-token")
assert resp.status_code == 401
def test_revoke_api_token(self, client):
"""DELETE /api/v1/user/api-token — Pro user revokes active token"""
token, _ = _create_user(client, "pro_revoke@test.com", plan="pro")
# Create first
client.post("/api/v1/user/api-token", headers=_auth_header(token))
# Revoke
resp = client.delete("/api/v1/user/api-token", headers=_auth_header(token))
assert resp.status_code == 200
data = resp.get_json()
assert data["revoked"] is True
assert data["count"] >= 1
def test_revoke_no_active_token(self, client):
"""DELETE with no active token → 404"""
token, _ = _create_user(client, "pro_notoken@test.com", plan="pro")
resp = client.delete("/api/v1/user/api-token", headers=_auth_header(token))
assert resp.status_code == 404
def test_revoke_non_pro(self, client):
"""DELETE for free user → 403"""
token, _ = _create_user(client, "free_revoke@test.com", plan="free")
resp = client.delete("/api/v1/user/api-token", headers=_auth_header(token))
assert resp.status_code == 403
# ─── Tests: X-API-Key Authentication ─────────────────────────────────────────
class TestApiKeyAuth:
def test_api_key_auth_on_protected_route(self, client):
"""Valid X-API-Key authenticates on protected route"""
token, _ = _create_user(client, "apikey_auth@test.com", plan="pro")
# Create API token
r = client.post("/api/v1/user/api-token", headers=_auth_header(token))
assert r.status_code == 201
raw_key = r.get_json()["token"]
# Use X-API-Key to access a protected route (try create again → 409 means authenticated)
resp = client.post("/api/v1/user/api-token", headers={"X-API-Key": raw_key})
# 409 means we were authenticated; 401 means auth failed
assert resp.status_code == 409
def test_api_key_invalid(self, client):
"""Invalid X-API-Key → 401"""
resp = client.post(
"/api/v1/user/api-token",
headers={"X-API-Key": "trf_invalidkeyXXXXXXXXXXXXXXXXXX"},
)
assert resp.status_code == 401
def test_api_key_revoked(self, client):
"""Revoked X-API-Key → 401"""
token, _ = _create_user(client, "revoked_apikey@test.com", plan="pro")
# Create token
r = client.post("/api/v1/user/api-token", headers=_auth_header(token))
assert r.status_code == 201
raw_key = r.get_json()["token"]
# Revoke it
client.delete("/api/v1/user/api-token", headers=_auth_header(token))
# Try using revoked key
resp = client.post("/api/v1/user/api-token", headers={"X-API-Key": raw_key})
assert resp.status_code == 401
def test_revoke_then_cannot_auth(self, client):
"""Full flow: create → use → revoke → X-API-Key rejected"""
token, _ = _create_user(client, "flow_test@test.com", plan="pro")
# Create
r = client.post("/api/v1/user/api-token", headers=_auth_header(token))
raw_key = r.get_json()["token"]
# Validate it works (409 because key exists)
r2 = client.post("/api/v1/user/api-token", headers={"X-API-Key": raw_key})
assert r2.status_code == 409
# Revoke
client.delete("/api/v1/user/api-token", headers=_auth_header(token))
# Try again with revoked key
r3 = client.post("/api/v1/user/api-token", headers={"X-API-Key": raw_key})
assert r3.status_code == 401
# ─── Tests: Webhook ───────────────────────────────────────────────────────────
class TestWebhook:
def test_create_webhook_pro(self, client):
"""POST /api/v1/user/webhook — Pro user with provided secret → 201"""
token, _ = _create_user(client, "webhook_pro@test.com", plan="pro")
resp = client.post(
"/api/v1/user/webhook",
headers=_auth_header(token),
json={"url": "https://example.com/hook", "secret": "mysecret123"},
)
assert resp.status_code == 201
data = resp.get_json()
assert data["webhook_url"] == "https://example.com/hook"
assert data["secret"] == "mysecret123"
def test_create_webhook_auto_secret(self, client):
"""POST without secret → auto-generated secret"""
token, _ = _create_user(client, "webhook_auto@test.com", plan="pro")
resp = client.post(
"/api/v1/user/webhook",
headers=_auth_header(token),
json={"url": "https://auto.example.com/hook"},
)
assert resp.status_code == 201
data = resp.get_json()
assert len(data["secret"]) == 64 # token_hex(32) = 64 hex chars
def test_create_webhook_non_pro_free(self, client):
"""Free user → 403"""
token, _ = _create_user(client, "webhook_free@test.com", plan="free")
resp = client.post(
"/api/v1/user/webhook",
headers=_auth_header(token),
json={"url": "https://example.com/hook"},
)
assert resp.status_code == 403
def test_create_webhook_non_pro_premium(self, client):
"""Premium user → 403"""
token, _ = _create_user(client, "webhook_premium@test.com", plan="premium")
resp = client.post(
"/api/v1/user/webhook",
headers=_auth_header(token),
json={"url": "https://example.com/hook"},
)
assert resp.status_code == 403
def test_create_webhook_url_not_https(self, client):
"""HTTP URL → 400"""
token, _ = _create_user(client, "webhook_http@test.com", plan="pro")
resp = client.post(
"/api/v1/user/webhook",
headers=_auth_header(token),
json={"url": "http://example.com/hook"},
)
assert resp.status_code == 400
assert "https" in resp.get_json()["error"].lower()
def test_create_webhook_missing_url(self, client):
"""Missing URL → 400"""
token, _ = _create_user(client, "webhook_nourl@test.com", plan="pro")
resp = client.post(
"/api/v1/user/webhook",
headers=_auth_header(token),
json={},
)
assert resp.status_code == 400
def test_webhook_upsert(self, client):
"""Second POST updates URL (upsert behavior)"""
token, _ = _create_user(client, "webhook_upsert@test.com", plan="pro")
client.post(
"/api/v1/user/webhook",
headers=_auth_header(token),
json={"url": "https://first.example.com/hook"},
)
resp = client.post(
"/api/v1/user/webhook",
headers=_auth_header(token),
json={"url": "https://second.example.com/hook"},
)
assert resp.status_code == 201
assert resp.get_json()["webhook_url"] == "https://second.example.com/hook"
def test_delete_webhook(self, client):
"""DELETE /api/v1/user/webhook → 200"""
token, _ = _create_user(client, "webhook_delete@test.com", plan="pro")
client.post(
"/api/v1/user/webhook",
headers=_auth_header(token),
json={"url": "https://delete.example.com/hook"},
)
resp = client.delete("/api/v1/user/webhook", headers=_auth_header(token))
assert resp.status_code == 200
assert resp.get_json()["deleted"] is True
def test_delete_webhook_not_configured(self, client):
"""DELETE without webhook configured → 404"""
token, _ = _create_user(client, "webhook_notset@test.com", plan="pro")
resp = client.delete("/api/v1/user/webhook", headers=_auth_header(token))
assert resp.status_code == 404
def test_delete_webhook_non_pro(self, client):
"""Free user DELETE → 403"""
token, _ = _create_user(client, "webhook_freedelete@test.com", plan="free")
resp = client.delete("/api/v1/user/webhook", headers=_auth_header(token))
assert resp.status_code == 403
# ─── Tests: dispatch_webhook ──────────────────────────────────────────────────
class TestDispatchWebhook:
def test_dispatch_no_webhook_configured(self):
"""dispatch_webhook silently returns when no webhook is configured"""
with patch("api_v1.utils_webhook.get_db") as mock_get_db:
mock_conn = MagicMock()
mock_conn.execute.return_value.fetchone.return_value = None
mock_get_db.return_value = mock_conn
from api_v1.utils_webhook import dispatch_webhook
# Should not raise, should return silently
dispatch_webhook("nonexistent_user", "new_prediction", {"data": "test"})
def test_dispatch_sends_hmac_header(self):
"""dispatch_webhook sends correct HMAC-SHA256 signature header"""
test_secret = "testsecret"
test_url = "https://hook.example.com/receive"
test_payload = {"race_id": "R123", "top1": "Cheval Blanc"}
with (
patch("api_v1.utils_webhook.get_db") as mock_get_db,
patch("api_v1.utils_webhook.requests.post") as mock_post,
):
mock_row = MagicMock()
mock_row.__getitem__ = lambda self, key: (
test_url if key == "url" else test_secret
)
mock_conn = MagicMock()
mock_conn.execute.return_value.fetchone.return_value = mock_row
mock_get_db.return_value = mock_conn
mock_response = MagicMock()
mock_response.status_code = 200
mock_post.return_value = mock_response
from api_v1.utils_webhook import dispatch_webhook, EVENT_NEW_PREDICTION
dispatch_webhook("user123", EVENT_NEW_PREDICTION, test_payload)
assert mock_post.called
call_kwargs = mock_post.call_args
headers_sent = call_kwargs.kwargs.get("headers") or call_kwargs[1].get(
"headers"
)
assert "X-Turf-Signature" in headers_sent
assert headers_sent["X-Turf-Signature"].startswith("sha256=")
assert headers_sent["X-Turf-Event"] == EVENT_NEW_PREDICTION