Merge pull request '[HRT-80] API Token personnel + Webhook alertes (Pro)' (#13) from feature/HRT-80-api-tokens-webhooks into master
Some checks failed
CD / Deploy → Staging (push) Has been cancelled
CD / Smoke Tests on Staging (push) Has been cancelled
CD / Deploy → Production (push) Has been cancelled
CD / Rollback Production (push) Has been cancelled

This commit was merged in pull request #13.
This commit is contained in:
2026-04-29 17:31:53 +02:00
9 changed files with 1433 additions and 4 deletions

57
api_tokens_db.py Normal file
View File

@@ -0,0 +1,57 @@
#!/usr/bin/env python3
"""
api_tokens_db.py — DB migration for personal API tokens + user webhooks
HRT-80: API Token personnel + Webhook alertes (Pro)
"""
import logging
import os
import sqlite3
DB_PATH = os.environ.get("TURF_SAAS_DB", "/home/h3r7/turf_saas/turf_saas.db")
logger = logging.getLogger("turf_saas.api_tokens_db")
def get_db() -> sqlite3.Connection:
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def migrate_api_tokens_tables() -> None:
"""Idempotent migration: create user_api_tokens and user_webhooks."""
conn = get_db()
c = conn.cursor()
c.executescript("""
CREATE TABLE IF NOT EXISTS user_api_tokens (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT NOT NULL,
token_hash TEXT NOT NULL UNIQUE,
token_prefix TEXT NOT NULL,
created_at DATETIME NOT NULL DEFAULT (datetime('now')),
last_used_at DATETIME,
revoked INTEGER NOT NULL DEFAULT 0
);
CREATE INDEX IF NOT EXISTS idx_api_tokens_user ON user_api_tokens(user_id);
CREATE INDEX IF NOT EXISTS idx_api_tokens_hash ON user_api_tokens(token_hash);
CREATE TABLE IF NOT EXISTS user_webhooks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT NOT NULL UNIQUE,
url TEXT NOT NULL,
secret TEXT NOT NULL,
created_at DATETIME NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_webhooks_user ON user_webhooks(user_id);
""")
conn.commit()
conn.close()
logger.info(
"[api_tokens_db] Tables user_api_tokens + user_webhooks created/verified."
)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO)
migrate_api_tokens_tables()
print("[api_tokens_db] Migration complete.")

View File

@@ -4,6 +4,7 @@ API v1 Blueprint package — Turf SaaS
Sprint 3-4: HRT-29 — Refacto API /v1/
Sprint 5-6: HRT-31 — Billing Stripe
HRT-79: Alertes Telegram configurables (user blueprint)
HRT-80: API Token personnel + Webhook alertes (Pro)
Registers sub-blueprints:
/api/v1/health — public health-check
@@ -15,6 +16,9 @@ Registers sub-blueprints:
/api/v1/metrics — métriques perf ML (premium+)
/api/v1/billing/ — Stripe checkout, portal, webhook, status
/api/v1/user/ — config utilisateur, alertes Telegram (premium+)
/api/v1/user/api-token — Personal API token (Pro)
/api/v1/user/webhook — Webhook config (Pro)
/api/v1/history — historique préd. ML (Free:7j, Premium:90j, Pro:illimité)
/api/v1/docs — Swagger UI (via flasgger, registered on app)
"""
@@ -29,6 +33,8 @@ from .routes.export import export_bp
from .routes.metrics import metrics_bp
from .routes.billing import billing_bp
from .routes.user import user_bp
from .routes.user_tokens import user_tokens_bp
from .routes.history import history_bp
# Master blueprint that aggregates all sub-routes under /api/v1
api_v1_bp = Blueprint("api_v1", __name__, url_prefix="/api/v1")
@@ -45,3 +51,5 @@ def register_api_v1(app):
app.register_blueprint(metrics_bp)
app.register_blueprint(billing_bp)
app.register_blueprint(user_bp)
app.register_blueprint(user_tokens_bp)
app.register_blueprint(history_bp)

212
api_v1/routes/history.py Normal file
View File

@@ -0,0 +1,212 @@
#!/usr/bin/env python3
"""
History routes for API v1.
GET /api/v1/history — Historique des prédictions avec filtre date range,
limité selon le plan (Free: 7j, Premium: 90j, Pro: illimité)
Ticket: HRT-81 — Historique limité/illimité selon plan (Free/Premium/Pro)
"""
from datetime import datetime, timedelta
from flask import Blueprint, jsonify, request, g
from api_v1.utils import (
get_db,
table_exists,
internal_error,
bad_request,
forbidden,
get_pagination_params,
paginate_query,
)
from auth import jwt_required_middleware
history_bp = Blueprint("v1_history", __name__, url_prefix="/api/v1/history")
# ──────────────────────────────────────────────────────────────
# Plan limits (days of history accessible; None = unlimited)
# ──────────────────────────────────────────────────────────────
HISTORY_DAYS = {
"free": 7,
"premium": 90,
"pro": None, # illimité
}
# Fallback for unknown plans: treat like free
_DEFAULT_LIMIT = 7
def _get_plan_max_days(plan: str):
"""Return the max history days allowed for the given plan, or default."""
return HISTORY_DAYS.get(plan, _DEFAULT_LIMIT)
def _parse_date(date_str: str, param_name: str):
"""Parse YYYY-MM-DD date string, raise ValueError with context on failure."""
try:
return datetime.strptime(date_str, "%Y-%m-%d").date()
except ValueError:
raise ValueError(
f"Paramètre '{param_name}' invalide : format attendu YYYY-MM-DD, reçu '{date_str}'"
)
# ──────────────────────────────────────────────────────────────
# GET /api/v1/history
# ──────────────────────────────────────────────────────────────
@history_bp.route("", methods=["GET"])
@jwt_required_middleware
def get_history():
"""
Historique des prédictions ML avec filtre date range
---
tags:
- Historique
summary: |
Historique des prédictions sur une plage de dates.
Limite selon le plan :
- Free : 7 derniers jours
- Premium : 90 derniers jours
- Pro : illimité
security:
- Bearer: []
parameters:
- name: start
in: query
type: string
format: date
description: Date de début au format YYYY-MM-DD (défaut : aujourd'hui - max_days du plan)
- name: end
in: query
type: string
format: date
description: Date de fin au format YYYY-MM-DD (défaut : aujourd'hui)
- name: limit
in: query
type: integer
default: 50
description: Nombre de résultats par page (max 500)
- name: offset
in: query
type: integer
default: 0
responses:
200:
description: Historique des prédictions ML
400:
description: Paramètre de date invalide
401:
description: Token invalide ou manquant
403:
description: Plage de dates hors limite du plan — upgrade requis
"""
user = getattr(g, "current_user", None)
if not user:
return jsonify({"error": "Non authentifié"}), 401
plan = user.get("plan", "free")
today = datetime.now().date()
max_days = _get_plan_max_days(plan)
# ── Parse end date ────────────────────────────────────────
end_str = request.args.get("end", today.isoformat())
try:
end_date = _parse_date(end_str, "end")
except ValueError as exc:
return bad_request(str(exc))
# ── Parse start date ─────────────────────────────────────
if max_days is not None:
default_start = today - timedelta(days=max_days - 1)
else:
# Pro: default to 30 days back when no start provided
default_start = today - timedelta(days=29)
start_str = request.args.get("start", default_start.isoformat())
try:
start_date = _parse_date(start_str, "start")
except ValueError as exc:
return bad_request(str(exc))
# ── Validate ordering ─────────────────────────────────────
if start_date > end_date:
return bad_request(
f"'start' ({start_str}) ne peut pas être postérieur à 'end' ({end_str})"
)
# ── Enforce plan window ───────────────────────────────────
if max_days is not None:
earliest_allowed = today - timedelta(days=max_days - 1)
if start_date < earliest_allowed:
return forbidden(
message=(
f"Historique limité à {max_days} jours pour le plan '{plan}'. "
f"Date de début minimale autorisée : {earliest_allowed.isoformat()}. "
f"Passez à un plan supérieur pour accéder à un historique plus long."
),
required_plans=["premium", "pro"] if plan == "free" else ["pro"],
current_plan=plan,
)
# ── Pagination ────────────────────────────────────────────
limit, offset = get_pagination_params(default_limit=50, max_limit=500)
# ── Query ─────────────────────────────────────────────────
conn = get_db()
try:
if not table_exists(conn, "ml_predictions_cache"):
return jsonify(
{
"status": "ok",
"plan": plan,
"start": start_date.isoformat(),
"end": end_date.isoformat(),
"history": [],
**paginate_query([], 0, limit, offset),
}
), 200
count_row = conn.execute(
"""SELECT COUNT(*) as cnt
FROM ml_predictions_cache
WHERE date >= ? AND date <= ?""",
(start_date.isoformat(), end_date.isoformat()),
).fetchone()
total = count_row["cnt"] if count_row else 0
sql = """
SELECT
id, date, horse_name, prob_top1, prob_top3,
ml_score, race_label, hippodrome, heure, is_value_bet
FROM ml_predictions_cache
WHERE date >= ? AND date <= ?
ORDER BY date DESC, ml_score DESC
LIMIT ? OFFSET ?
"""
rows = conn.execute(
sql,
(start_date.isoformat(), end_date.isoformat(), limit, offset),
).fetchall()
history = [dict(r) for r in rows]
return jsonify(
{
"status": "ok",
"plan": plan,
"history_limit_days": max_days,
"start": start_date.isoformat(),
"end": end_date.isoformat(),
"history": history,
**paginate_query(history, total, limit, offset),
}
), 200
except Exception as exc:
return internal_error(str(exc))
finally:
conn.close()

View File

@@ -0,0 +1,195 @@
#!/usr/bin/env python3
"""
user_tokens.py — Personal API tokens + Webhook configuration (Pro plan)
HRT-80
Endpoints:
POST /api/v1/user/api-token
DELETE /api/v1/user/api-token
POST /api/v1/user/webhook
DELETE /api/v1/user/webhook
"""
import hashlib
import logging
import secrets
from flask import Blueprint, g, jsonify, request
from api_tokens_db import get_db, migrate_api_tokens_tables
from auth import jwt_required_middleware, plan_required
logger = logging.getLogger("turf_saas.user_tokens")
user_tokens_bp = Blueprint("user_tokens", __name__, url_prefix="/api/v1/user")
try:
migrate_api_tokens_tables()
except Exception as _e:
logger.warning("api_tokens_db migration skipped (test env?): %s", _e)
def _hash_token(raw: str) -> str:
return hashlib.sha256(raw.encode()).hexdigest()
@user_tokens_bp.route("/api-token", methods=["POST"])
@jwt_required_middleware
@plan_required("pro")
def create_api_token():
user = g.current_user
user_id = str(user["id"])
conn = get_db()
try:
existing = conn.execute(
"SELECT id, token_prefix, created_at FROM user_api_tokens "
"WHERE user_id = ? AND revoked = 0",
(user_id,),
).fetchone()
if existing:
return jsonify(
{
"error": "Un token actif existe déjà. Révoquez-le avant d'en créer un nouveau.",
"existing_prefix": existing["token_prefix"],
"created_at": existing["created_at"],
}
), 409
raw_token = "trf_" + secrets.token_urlsafe(40)
token_hash = _hash_token(raw_token)
token_prefix = raw_token[:12]
conn.execute(
"INSERT INTO user_api_tokens (user_id, token_hash, token_prefix) VALUES (?, ?, ?)",
(user_id, token_hash, token_prefix),
)
conn.commit()
row = conn.execute(
"SELECT created_at FROM user_api_tokens WHERE token_hash = ?",
(token_hash,),
).fetchone()
created_at = row["created_at"] if row else None
except Exception as e:
conn.rollback()
logger.error("create_api_token error for user %s: %s", user_id, e)
return jsonify({"error": "Erreur interne"}), 500
finally:
conn.close()
logger.info("API token created for user %s (prefix=%s)", user_id, token_prefix)
return jsonify(
{
"token": raw_token,
"prefix": token_prefix,
"created_at": created_at,
"warning": "Conservez ce token en lieu sûr. Il ne sera plus affiché.",
}
), 201
@user_tokens_bp.route("/api-token", methods=["DELETE"])
@jwt_required_middleware
@plan_required("pro")
def revoke_api_token():
user = g.current_user
user_id = str(user["id"])
conn = get_db()
try:
result = conn.execute(
"UPDATE user_api_tokens SET revoked = 1 WHERE user_id = ? AND revoked = 0",
(user_id,),
)
conn.commit()
revoked_count = result.rowcount
except Exception as e:
conn.rollback()
logger.error("revoke_api_token error for user %s: %s", user_id, e)
return jsonify({"error": "Erreur interne"}), 500
finally:
conn.close()
if revoked_count == 0:
return jsonify({"error": "Aucun token actif trouvé"}), 404
logger.info("API token(s) revoked for user %s (%d tokens)", user_id, revoked_count)
return jsonify({"revoked": True, "count": revoked_count}), 200
@user_tokens_bp.route("/webhook", methods=["POST"])
@jwt_required_middleware
@plan_required("pro")
def create_webhook():
user = g.current_user
user_id = str(user["id"])
data = request.get_json(silent=True) or {}
url = (data.get("url") or "").strip()
if not url:
return jsonify({"error": "URL du webhook manquante"}), 400
if not url.startswith("https://"):
return jsonify(
{"error": "L'URL du webhook doit utiliser HTTPS (commencer par https://)"}
), 400
secret = (data.get("secret") or "").strip() or secrets.token_hex(32)
conn = get_db()
existing = None
try:
existing = conn.execute(
"SELECT id FROM user_webhooks WHERE user_id = ?", (user_id,)
).fetchone()
if existing:
conn.execute(
"UPDATE user_webhooks SET url = ?, secret = ?, created_at = datetime('now') "
"WHERE user_id = ?",
(url, secret, user_id),
)
else:
conn.execute(
"INSERT INTO user_webhooks (user_id, url, secret) VALUES (?, ?, ?)",
(user_id, url, secret),
)
conn.commit()
except Exception as e:
conn.rollback()
logger.error("create_webhook error for user %s: %s", user_id, e)
return jsonify({"error": "Erreur interne"}), 500
finally:
conn.close()
action = "mis à jour" if existing else "configuré"
logger.info("Webhook %s for user %s: %s", action, user_id, url)
return jsonify(
{
"webhook_url": url,
"secret": secret,
"message": f"Webhook {action} avec succès",
}
), 201
@user_tokens_bp.route("/webhook", methods=["DELETE"])
@jwt_required_middleware
@plan_required("pro")
def delete_webhook():
user = g.current_user
user_id = str(user["id"])
conn = get_db()
try:
result = conn.execute("DELETE FROM user_webhooks WHERE user_id = ?", (user_id,))
conn.commit()
deleted_count = result.rowcount
except Exception as e:
conn.rollback()
logger.error("delete_webhook error for user %s: %s", user_id, e)
return jsonify({"error": "Erreur interne"}), 500
finally:
conn.close()
if deleted_count == 0:
return jsonify({"error": "Aucun webhook configuré"}), 404
logger.info("Webhook deleted for user %s", user_id)
return jsonify({"deleted": True}), 200

80
api_v1/utils_webhook.py Normal file
View File

@@ -0,0 +1,80 @@
#!/usr/bin/env python3
"""
utils_webhook.py — Webhook dispatch utility (fire-and-forget, HMAC-SHA256)
HRT-80
"""
import hashlib
import hmac
import json
import logging
import requests
from api_tokens_db import get_db
logger = logging.getLogger("turf_saas.webhook")
EVENT_NEW_PREDICTION = "new_prediction"
EVENT_VALUE_BET = "value_bet"
def dispatch_webhook(user_id: str, event_type: str, payload: dict) -> None:
"""
Send HMAC-signed webhook POST to URL configured by user.
Fire-and-forget: errors logged, never re-raised. Timeout: 5s.
"""
conn = get_db()
try:
row = conn.execute(
"SELECT url, secret FROM user_webhooks WHERE user_id = ?",
(str(user_id),),
).fetchone()
except Exception as e:
logger.warning("dispatch_webhook: DB error for user %s: %s", user_id, e)
return
finally:
conn.close()
if not row:
return
url = row["url"]
secret = row["secret"]
body = json.dumps(
{"event": event_type, "data": payload},
ensure_ascii=False,
separators=(",", ":"),
)
signature = hmac.new(
secret.encode("utf-8"), body.encode("utf-8"), hashlib.sha256
).hexdigest()
headers = {
"Content-Type": "application/json",
"X-Turf-Signature": f"sha256={signature}",
"X-Turf-Event": event_type,
"User-Agent": "TurfSaaS-Webhook/1.0",
}
try:
resp = requests.post(url, data=body, headers=headers, timeout=5)
logger.info(
"Webhook dispatched to user %s (event=%s, status=%s)",
user_id,
event_type,
resp.status_code,
)
except requests.exceptions.Timeout:
logger.warning(
"Webhook timeout for user %s (event=%s, url=%s)", user_id, event_type, url
)
except requests.exceptions.RequestException as e:
logger.warning(
"Webhook failed for user %s (event=%s): %s", user_id, event_type, e
)
except Exception as e:
logger.warning(
"Webhook unexpected error for user %s (event=%s): %s",
user_id,
event_type,
e,
)

50
auth.py
View File

@@ -258,11 +258,47 @@ def logout():
# ──────────────────────────────────────────────────────────────
def validate_api_key(raw_key: str):
"""
Validate a personal API token (X-API-Key header).
Returns user dict or None. Updates last_used_at on success.
HRT-80: Personal API token support.
"""
if not raw_key:
return None
key_hash = hashlib.sha256(raw_key.encode()).hexdigest()
db = get_db()
try:
row = db.execute(
"SELECT t.user_id, u.* FROM user_api_tokens t "
"JOIN users u ON CAST(t.user_id AS INTEGER) = u.id "
"WHERE t.token_hash = ? AND t.revoked = 0 AND u.is_active = 1",
(key_hash,),
).fetchone()
if row:
db.execute(
"UPDATE user_api_tokens SET last_used_at = datetime('now') "
"WHERE token_hash = ?",
(key_hash,),
)
db.commit()
return dict(row) if row else None
except Exception as e:
logger.warning("validate_api_key error: %s", e)
return None
finally:
db.close()
def jwt_required_middleware(fn):
"""Decorator: require a valid Bearer JWT access token."""
"""
Decorator: require a valid Bearer JWT access token OR X-API-Key personal token.
HRT-80: Added X-API-Key fallback for personal API tokens (Pro plan only).
"""
@wraps(fn)
def wrapper(*args, **kwargs):
# 1. Try Bearer JWT (existing flow — unchanged)
try:
verify_jwt_in_request()
user_id = int(get_jwt_identity())
@@ -271,11 +307,21 @@ def jwt_required_middleware(fn):
return jsonify({"error": "Utilisateur introuvable"}), 401
g.current_user = dict(user)
g.current_user_id = user_id
return fn(*args, **kwargs)
except (JWTExtendedException, PyJWTError) as e:
logger.debug("JWT auth failed: %s", e)
return jsonify({"error": "Token invalide ou expiré", "detail": str(e)}), 401
# 2. Fallback: X-API-Key personal token (HRT-80)
api_key = request.headers.get("X-API-Key", "").strip()
if api_key:
user = validate_api_key(api_key)
if user:
g.current_user = user
g.current_user_id = user.get("id")
return fn(*args, **kwargs)
return jsonify({"error": "Token invalide ou expiré"}), 401
return wrapper

View File

@@ -8,6 +8,7 @@ Sprint 4-5 — HRT-30
from flask import Blueprint, request, jsonify, current_app
import sqlite3
import hashlib
import logging
import secrets
import os
import time
@@ -229,14 +230,54 @@ def hash_password(password: str) -> str:
return hashlib.sha256(password.encode("utf-8")).hexdigest()
def validate_api_key(raw_key: str):
"""
Validate a personal API token (X-API-Key header).
Returns user dict or None. Updates last_used_at on success.
HRT-80
"""
if not raw_key:
return None
key_hash = hashlib.sha256(raw_key.encode()).hexdigest()
conn = get_db()
try:
row = conn.execute(
"SELECT t.user_id, u.* FROM user_api_tokens t "
"JOIN saas_users u ON t.user_id = u.id "
"WHERE t.token_hash = ? AND t.revoked = 0",
(key_hash,),
).fetchone()
if row:
conn.execute(
"UPDATE user_api_tokens SET last_used_at = datetime('now') "
"WHERE token_hash = ?",
(key_hash,),
)
conn.commit()
return dict(row) if row else None
except Exception as e:
logging.getLogger("turf_saas.auth").warning("validate_api_key error: %s", e)
return None
finally:
conn.close()
def require_auth(f):
@wraps(f)
def decorated(*args, **kwargs):
# 1. Try Bearer session token (existing flow — unchanged)
auth = request.headers.get("Authorization", "")
token = (
auth.removeprefix("Bearer ").strip() if auth.startswith("Bearer ") else None
)
user = validate_token(token)
user = validate_token(token) if token else None
# 2. Fallback: X-API-Key personal token (HRT-80)
if not user:
api_key = request.headers.get("X-API-Key", "").strip()
if api_key:
user = validate_api_key(api_key)
if not user:
return jsonify({"error": "Non authentifié"}), 401
request.current_user = user

407
tests/test_history.py Normal file
View File

@@ -0,0 +1,407 @@
#!/usr/bin/env python3
"""
Tests for GET /api/v1/history — HRT-81
Historique limité/illimité selon plan (Free/Premium/Pro)
Run with:
cd /home/h3r7/turf_saas
source venv/bin/activate
python -m pytest tests/test_history.py -v
"""
import json
import os
import sys
import sqlite3
import tempfile
from datetime import datetime, timedelta
import pytest
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# Use an isolated temp DB for these tests
_tmp_db = tempfile.NamedTemporaryFile(suffix=".db", delete=False)
_tmp_db.close()
os.environ["TURF_SAAS_DB"] = _tmp_db.name
os.environ["JWT_SECRET_KEY"] = "test-history-secret-key"
from app_v1 import create_app
from auth_db import init_auth_tables
# ──────────────────────────────────────────────────────────────
# Helpers
# ──────────────────────────────────────────────────────────────
TODAY = datetime.now().date()
def days_ago(n: int) -> str:
return (TODAY - timedelta(days=n)).isoformat()
def auth_header(token: str) -> dict:
return {"Authorization": f"Bearer {token}"}
# ──────────────────────────────────────────────────────────────
# Fixtures
# ──────────────────────────────────────────────────────────────
@pytest.fixture(scope="module")
def app():
application = create_app()
application.config["TESTING"] = True
application.config["JWT_SECRET_KEY"] = "test-history-secret-key"
return application
@pytest.fixture(scope="module")
def client(app):
return app.test_client()
@pytest.fixture(scope="module")
def seeded_db():
"""
Seed the test DB:
- Create ml_predictions_cache with rows spanning 120 days back
- Create users for free/premium/pro plans
"""
db_path = os.environ["TURF_SAAS_DB"]
conn = sqlite3.connect(db_path)
# Create ml_predictions_cache table if absent
conn.execute("""
CREATE TABLE IF NOT EXISTS ml_predictions_cache (
id INTEGER PRIMARY KEY AUTOINCREMENT,
date TEXT NOT NULL,
horse_name TEXT,
prob_top1 REAL,
prob_top3 REAL,
ml_score REAL,
race_label TEXT,
hippodrome TEXT,
heure TEXT,
is_value_bet INTEGER DEFAULT 0
)
""")
# Seed rows at: 1, 6, 7, 8, 30, 89, 90, 91, 100 days ago
offsets = [1, 6, 7, 8, 30, 89, 90, 91, 100]
for offset in offsets:
d = days_ago(offset)
conn.execute(
"""INSERT INTO ml_predictions_cache
(date, horse_name, prob_top1, prob_top3, ml_score, race_label, hippodrome, heure, is_value_bet)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)""",
(d, f"Cheval_{offset}j", 0.5, 0.8, 0.75, f"R1C1", "PARIS", "14:00", 0),
)
conn.commit()
conn.close()
return db_path
@pytest.fixture(scope="module")
def auth_tokens(client, seeded_db):
"""Register/login users for each plan and return their JWT tokens."""
plans = {
"free": "hist_free@test.com",
"premium": "hist_premium@test.com",
"pro": "hist_pro@test.com",
}
password = "password123"
for plan, email in plans.items():
r = client.post(
"/api/v1/auth/register",
json={"email": email, "password": password},
content_type="application/json",
)
assert r.status_code in (201, 409), f"register failed for {plan}: {r.data}"
# Set plan via direct DB
db_path = os.environ["TURF_SAAS_DB"]
conn = sqlite3.connect(db_path)
for plan, email in plans.items():
conn.execute("UPDATE users SET plan = ? WHERE email = ?", (plan, email))
conn.commit()
conn.close()
tokens = {}
for plan, email in plans.items():
r = client.post(
"/api/v1/auth/login",
json={"email": email, "password": password},
content_type="application/json",
)
assert r.status_code == 200, f"login failed for {plan}: {r.data}"
tokens[plan] = r.get_json()["access_token"]
return tokens
# ──────────────────────────────────────────────────────────────
# Auth guard
# ──────────────────────────────────────────────────────────────
class TestHistoryAuth:
def test_requires_auth(self, client):
"""Unauthenticated request must return 401."""
r = client.get("/api/v1/history")
assert r.status_code == 401
def test_invalid_token_returns_401(self, client):
r = client.get(
"/api/v1/history",
headers={"Authorization": "Bearer this.is.not.valid"},
)
assert r.status_code == 401
# ──────────────────────────────────────────────────────────────
# Free plan — 7-day window
# ──────────────────────────────────────────────────────────────
class TestHistoryFreePlan:
def test_free_can_access_last_7_days(self, client, auth_tokens, seeded_db):
"""Free user: start = today-6 (within 7-day window) must return 200."""
start = days_ago(6)
r = client.get(
f"/api/v1/history?start={start}&end={TODAY.isoformat()}",
headers=auth_header(auth_tokens["free"]),
)
assert r.status_code == 200
data = r.get_json()
assert data["status"] == "ok"
assert data["plan"] == "free"
assert data["history_limit_days"] == 7
def test_free_blocked_beyond_7_days(self, client, auth_tokens, seeded_db):
"""Free user: start = today-8 must return 403 (beyond 7-day window)."""
start = days_ago(8)
r = client.get(
f"/api/v1/history?start={start}&end={TODAY.isoformat()}",
headers=auth_header(auth_tokens["free"]),
)
assert r.status_code == 403
data = r.get_json()
assert data["code"] == 403
assert (
"upgrade" in data.get("message", "").lower()
or "plan" in data.get("message", "").lower()
)
def test_free_default_request_returns_200(self, client, auth_tokens, seeded_db):
"""Free user: no dates specified — should use defaults and return 200."""
r = client.get(
"/api/v1/history",
headers=auth_header(auth_tokens["free"]),
)
assert r.status_code == 200
data = r.get_json()
assert data["status"] == "ok"
assert "history" in data
assert "pagination" in data
def test_free_upgrade_hint_in_403(self, client, auth_tokens, seeded_db):
"""403 response must contain required_plans and upgrade_url."""
start = days_ago(30)
r = client.get(
f"/api/v1/history?start={start}",
headers=auth_header(auth_tokens["free"]),
)
assert r.status_code == 403
data = r.get_json()
assert "required_plans" in data
assert "upgrade_url" in data
# ──────────────────────────────────────────────────────────────
# Premium plan — 90-day window
# ──────────────────────────────────────────────────────────────
class TestHistoryPremiumPlan:
def test_premium_can_access_within_90_days(self, client, auth_tokens, seeded_db):
"""Premium user: start = today-89 must return 200."""
start = days_ago(89)
r = client.get(
f"/api/v1/history?start={start}&end={TODAY.isoformat()}",
headers=auth_header(auth_tokens["premium"]),
)
assert r.status_code == 200
data = r.get_json()
assert data["status"] == "ok"
assert data["plan"] == "premium"
assert data["history_limit_days"] == 90
def test_premium_blocked_beyond_90_days(self, client, auth_tokens, seeded_db):
"""Premium user: start = today-91 must return 403."""
start = days_ago(91)
r = client.get(
f"/api/v1/history?start={start}&end={TODAY.isoformat()}",
headers=auth_header(auth_tokens["premium"]),
)
assert r.status_code == 403
data = r.get_json()
assert data["code"] == 403
assert "required_plans" in data
# Premium upgrade hint should suggest pro
assert "pro" in data.get("required_plans", [])
def test_premium_can_access_last_7_days(self, client, auth_tokens, seeded_db):
"""Premium user can always access the free window too."""
start = days_ago(6)
r = client.get(
f"/api/v1/history?start={start}",
headers=auth_header(auth_tokens["premium"]),
)
assert r.status_code == 200
# ──────────────────────────────────────────────────────────────
# Pro plan — unlimited
# ──────────────────────────────────────────────────────────────
class TestHistoryProPlan:
def test_pro_can_access_old_data(self, client, auth_tokens, seeded_db):
"""Pro user: start = today-100 must return 200 (unlimited)."""
start = days_ago(100)
r = client.get(
f"/api/v1/history?start={start}&end={TODAY.isoformat()}",
headers=auth_header(auth_tokens["pro"]),
)
assert r.status_code == 200
data = r.get_json()
assert data["status"] == "ok"
assert data["plan"] == "pro"
assert data["history_limit_days"] is None # unlimited
def test_pro_default_request_returns_200(self, client, auth_tokens, seeded_db):
r = client.get(
"/api/v1/history",
headers=auth_header(auth_tokens["pro"]),
)
assert r.status_code == 200
def test_pro_can_see_all_seeded_rows(self, client, auth_tokens, seeded_db):
"""Pro fetching entire seeded range (100 days) should get all inserted rows."""
start = days_ago(100)
end = TODAY.isoformat()
r = client.get(
f"/api/v1/history?start={start}&end={end}&limit=500",
headers=auth_header(auth_tokens["pro"]),
)
assert r.status_code == 200
data = r.get_json()
# All 9 seeded rows should be present
assert data["pagination"]["total"] == 9
# ──────────────────────────────────────────────────────────────
# Input validation
# ──────────────────────────────────────────────────────────────
class TestHistoryValidation:
def test_invalid_start_format(self, client, auth_tokens):
r = client.get(
"/api/v1/history?start=31-12-2025",
headers=auth_header(auth_tokens["pro"]),
)
assert r.status_code == 400
data = r.get_json()
assert data["code"] == 400
assert "start" in data["message"].lower()
def test_invalid_end_format(self, client, auth_tokens):
r = client.get(
"/api/v1/history?end=2025/12/31",
headers=auth_header(auth_tokens["pro"]),
)
assert r.status_code == 400
data = r.get_json()
assert "end" in data["message"].lower()
def test_start_after_end_returns_400(self, client, auth_tokens):
r = client.get(
f"/api/v1/history?start={TODAY.isoformat()}&end={days_ago(5)}",
headers=auth_header(auth_tokens["pro"]),
)
assert r.status_code == 400
def test_pagination_limit_respected(self, client, auth_tokens, seeded_db):
start = days_ago(100)
r = client.get(
f"/api/v1/history?start={start}&limit=3&offset=0",
headers=auth_header(auth_tokens["pro"]),
)
assert r.status_code == 200
data = r.get_json()
assert len(data["history"]) <= 3
assert data["pagination"]["limit"] == 3
def test_pagination_has_more(self, client, auth_tokens, seeded_db):
"""has_more should be True when more rows exist beyond current page."""
start = days_ago(100)
r = client.get(
f"/api/v1/history?start={start}&limit=3&offset=0",
headers=auth_header(auth_tokens["pro"]),
)
assert r.status_code == 200
data = r.get_json()
# 9 total rows seeded, limit=3 → has_more=True
assert data["pagination"]["has_more"] is True
def test_response_shape(self, client, auth_tokens, seeded_db):
"""Verify the full response envelope shape."""
r = client.get(
"/api/v1/history",
headers=auth_header(auth_tokens["pro"]),
)
assert r.status_code == 200
data = r.get_json()
assert "status" in data
assert "plan" in data
assert "history_limit_days" in data
assert "start" in data
assert "end" in data
assert "history" in data
assert "pagination" in data
pagination = data["pagination"]
assert "total" in pagination
assert "limit" in pagination
assert "offset" in pagination
assert "has_more" in pagination
def test_history_row_fields(self, client, auth_tokens, seeded_db):
"""Each history row must contain the expected ML fields."""
start = days_ago(10)
r = client.get(
f"/api/v1/history?start={start}&limit=5",
headers=auth_header(auth_tokens["pro"]),
)
assert r.status_code == 200
data = r.get_json()
if data["history"]:
row = data["history"][0]
expected_fields = {
"id",
"date",
"horse_name",
"prob_top1",
"prob_top3",
"ml_score",
"race_label",
"hippodrome",
"heure",
"is_value_bet",
}
assert expected_fields.issubset(set(row.keys()))

383
tests/test_user_tokens.py Normal file
View File

@@ -0,0 +1,383 @@
#!/usr/bin/env python3
"""
tests/test_user_tokens.py — Personal API Token + Webhook alertes
HRT-80: Tests unitaires et d'intégration
Couvre:
- POST /api/v1/user/api-token (create)
- DELETE /api/v1/user/api-token (revoke)
- POST /api/v1/user/webhook (create/upsert)
- DELETE /api/v1/user/webhook (delete)
- Authentification via X-API-Key
- dispatch_webhook() fire-and-forget
- Plan enforcement Pro uniquement
Run:
./venv/bin/pytest tests/test_user_tokens.py -v --tb=short
"""
import hashlib
import json
import os
import sqlite3
import sys
import tempfile
from unittest.mock import MagicMock, patch
import pytest
# ─── Test DB isolation ────────────────────────────────────────────────────────
_tmp_db = tempfile.NamedTemporaryFile(suffix=".db", delete=False)
_tmp_db.close()
os.environ["TURF_SAAS_DB"] = _tmp_db.name
os.environ["JWT_SECRET_KEY"] = "test-secret-hrt80"
# Add project root to path
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
from app_v1 import create_app # noqa: E402
TEST_CONFIG = {
"TESTING": True,
"JWT_SECRET_KEY": "test-secret-hrt80",
}
@pytest.fixture(scope="module")
def app():
application = create_app()
application.config.update(TEST_CONFIG)
yield application
@pytest.fixture(scope="module")
def client(app):
return app.test_client()
# ─── Helpers ─────────────────────────────────────────────────────────────────
def _create_user(client, email, plan="pro"):
"""Register user (plan=free) then update plan in DB."""
resp = client.post(
"/api/v1/auth/register",
json={"email": email, "password": "Secure123"},
)
assert resp.status_code == 201, resp.get_json()
user_id = resp.get_json()["user_id"]
# Update plan directly in DB (no plan-update endpoint in JWT auth)
conn = sqlite3.connect(os.environ["TURF_SAAS_DB"])
conn.execute("UPDATE users SET plan = ? WHERE id = ?", (plan, user_id))
conn.commit()
conn.close()
# Login to get access token
login_resp = client.post(
"/api/v1/auth/login",
json={"email": email, "password": "Secure123"},
)
assert login_resp.status_code == 200, login_resp.get_json()
access_token = login_resp.get_json()["access_token"]
return access_token, user_id
def _auth_header(token):
return {"Authorization": f"Bearer {token}"}
# ─── Tests: API Token (Pro) ───────────────────────────────────────────────────
class TestApiToken:
def test_create_api_token_pro(self, client):
"""POST /api/v1/user/api-token — Pro user gets 201 + token starting with trf_"""
token, _ = _create_user(client, "pro_token@test.com", plan="pro")
resp = client.post("/api/v1/user/api-token", headers=_auth_header(token))
assert resp.status_code == 201, resp.get_json()
data = resp.get_json()
assert data["token"].startswith("trf_")
assert data["prefix"] == data["token"][:12]
assert "warning" in data
assert "created_at" in data
def test_create_api_token_stores_hash_not_raw(self, client):
"""Second POST returns 409 — only hashed token stored"""
token, _ = _create_user(client, "pro_token2@test.com", plan="pro")
# First create
r1 = client.post("/api/v1/user/api-token", headers=_auth_header(token))
assert r1.status_code == 201
raw_token = r1.get_json()["token"]
# Second create should conflict
r2 = client.post("/api/v1/user/api-token", headers=_auth_header(token))
assert r2.status_code == 409
data = r2.get_json()
assert "existing_prefix" in data
# Verify raw token is NOT stored in DB (only hash)
conn = sqlite3.connect(os.environ["TURF_SAAS_DB"])
row = conn.execute(
"SELECT token_hash FROM user_api_tokens WHERE token_prefix = ?",
(raw_token[:12],),
).fetchone()
conn.close()
assert row is not None
assert row[0] != raw_token # hash != raw
assert len(row[0]) == 64 # SHA256 hex
def test_create_api_token_free_user(self, client):
"""Free user gets 403"""
token, _ = _create_user(client, "free_token@test.com", plan="free")
resp = client.post("/api/v1/user/api-token", headers=_auth_header(token))
assert resp.status_code == 403
def test_create_api_token_premium_user(self, client):
"""Premium user gets 403 (Pro only feature)"""
token, _ = _create_user(client, "premium_token@test.com", plan="premium")
resp = client.post("/api/v1/user/api-token", headers=_auth_header(token))
assert resp.status_code == 403
def test_create_api_token_no_auth(self, client):
"""No auth → 401"""
resp = client.post("/api/v1/user/api-token")
assert resp.status_code == 401
def test_revoke_api_token(self, client):
"""DELETE /api/v1/user/api-token — Pro user revokes active token"""
token, _ = _create_user(client, "pro_revoke@test.com", plan="pro")
# Create first
client.post("/api/v1/user/api-token", headers=_auth_header(token))
# Revoke
resp = client.delete("/api/v1/user/api-token", headers=_auth_header(token))
assert resp.status_code == 200
data = resp.get_json()
assert data["revoked"] is True
assert data["count"] >= 1
def test_revoke_no_active_token(self, client):
"""DELETE with no active token → 404"""
token, _ = _create_user(client, "pro_notoken@test.com", plan="pro")
resp = client.delete("/api/v1/user/api-token", headers=_auth_header(token))
assert resp.status_code == 404
def test_revoke_non_pro(self, client):
"""DELETE for free user → 403"""
token, _ = _create_user(client, "free_revoke@test.com", plan="free")
resp = client.delete("/api/v1/user/api-token", headers=_auth_header(token))
assert resp.status_code == 403
# ─── Tests: X-API-Key Authentication ─────────────────────────────────────────
class TestApiKeyAuth:
def test_api_key_auth_on_protected_route(self, client):
"""Valid X-API-Key authenticates on protected route"""
token, _ = _create_user(client, "apikey_auth@test.com", plan="pro")
# Create API token
r = client.post("/api/v1/user/api-token", headers=_auth_header(token))
assert r.status_code == 201
raw_key = r.get_json()["token"]
# Use X-API-Key to access a protected route (try create again → 409 means authenticated)
resp = client.post("/api/v1/user/api-token", headers={"X-API-Key": raw_key})
# 409 means we were authenticated; 401 means auth failed
assert resp.status_code == 409
def test_api_key_invalid(self, client):
"""Invalid X-API-Key → 401"""
resp = client.post(
"/api/v1/user/api-token",
headers={"X-API-Key": "trf_invalidkeyXXXXXXXXXXXXXXXXXX"},
)
assert resp.status_code == 401
def test_api_key_revoked(self, client):
"""Revoked X-API-Key → 401"""
token, _ = _create_user(client, "revoked_apikey@test.com", plan="pro")
# Create token
r = client.post("/api/v1/user/api-token", headers=_auth_header(token))
assert r.status_code == 201
raw_key = r.get_json()["token"]
# Revoke it
client.delete("/api/v1/user/api-token", headers=_auth_header(token))
# Try using revoked key
resp = client.post("/api/v1/user/api-token", headers={"X-API-Key": raw_key})
assert resp.status_code == 401
def test_revoke_then_cannot_auth(self, client):
"""Full flow: create → use → revoke → X-API-Key rejected"""
token, _ = _create_user(client, "flow_test@test.com", plan="pro")
# Create
r = client.post("/api/v1/user/api-token", headers=_auth_header(token))
raw_key = r.get_json()["token"]
# Validate it works (409 because key exists)
r2 = client.post("/api/v1/user/api-token", headers={"X-API-Key": raw_key})
assert r2.status_code == 409
# Revoke
client.delete("/api/v1/user/api-token", headers=_auth_header(token))
# Try again with revoked key
r3 = client.post("/api/v1/user/api-token", headers={"X-API-Key": raw_key})
assert r3.status_code == 401
# ─── Tests: Webhook ───────────────────────────────────────────────────────────
class TestWebhook:
def test_create_webhook_pro(self, client):
"""POST /api/v1/user/webhook — Pro user with provided secret → 201"""
token, _ = _create_user(client, "webhook_pro@test.com", plan="pro")
resp = client.post(
"/api/v1/user/webhook",
headers=_auth_header(token),
json={"url": "https://example.com/hook", "secret": "mysecret123"},
)
assert resp.status_code == 201
data = resp.get_json()
assert data["webhook_url"] == "https://example.com/hook"
assert data["secret"] == "mysecret123"
def test_create_webhook_auto_secret(self, client):
"""POST without secret → auto-generated secret"""
token, _ = _create_user(client, "webhook_auto@test.com", plan="pro")
resp = client.post(
"/api/v1/user/webhook",
headers=_auth_header(token),
json={"url": "https://auto.example.com/hook"},
)
assert resp.status_code == 201
data = resp.get_json()
assert len(data["secret"]) == 64 # token_hex(32) = 64 hex chars
def test_create_webhook_non_pro_free(self, client):
"""Free user → 403"""
token, _ = _create_user(client, "webhook_free@test.com", plan="free")
resp = client.post(
"/api/v1/user/webhook",
headers=_auth_header(token),
json={"url": "https://example.com/hook"},
)
assert resp.status_code == 403
def test_create_webhook_non_pro_premium(self, client):
"""Premium user → 403"""
token, _ = _create_user(client, "webhook_premium@test.com", plan="premium")
resp = client.post(
"/api/v1/user/webhook",
headers=_auth_header(token),
json={"url": "https://example.com/hook"},
)
assert resp.status_code == 403
def test_create_webhook_url_not_https(self, client):
"""HTTP URL → 400"""
token, _ = _create_user(client, "webhook_http@test.com", plan="pro")
resp = client.post(
"/api/v1/user/webhook",
headers=_auth_header(token),
json={"url": "http://example.com/hook"},
)
assert resp.status_code == 400
assert "https" in resp.get_json()["error"].lower()
def test_create_webhook_missing_url(self, client):
"""Missing URL → 400"""
token, _ = _create_user(client, "webhook_nourl@test.com", plan="pro")
resp = client.post(
"/api/v1/user/webhook",
headers=_auth_header(token),
json={},
)
assert resp.status_code == 400
def test_webhook_upsert(self, client):
"""Second POST updates URL (upsert behavior)"""
token, _ = _create_user(client, "webhook_upsert@test.com", plan="pro")
client.post(
"/api/v1/user/webhook",
headers=_auth_header(token),
json={"url": "https://first.example.com/hook"},
)
resp = client.post(
"/api/v1/user/webhook",
headers=_auth_header(token),
json={"url": "https://second.example.com/hook"},
)
assert resp.status_code == 201
assert resp.get_json()["webhook_url"] == "https://second.example.com/hook"
def test_delete_webhook(self, client):
"""DELETE /api/v1/user/webhook → 200"""
token, _ = _create_user(client, "webhook_delete@test.com", plan="pro")
client.post(
"/api/v1/user/webhook",
headers=_auth_header(token),
json={"url": "https://delete.example.com/hook"},
)
resp = client.delete("/api/v1/user/webhook", headers=_auth_header(token))
assert resp.status_code == 200
assert resp.get_json()["deleted"] is True
def test_delete_webhook_not_configured(self, client):
"""DELETE without webhook configured → 404"""
token, _ = _create_user(client, "webhook_notset@test.com", plan="pro")
resp = client.delete("/api/v1/user/webhook", headers=_auth_header(token))
assert resp.status_code == 404
def test_delete_webhook_non_pro(self, client):
"""Free user DELETE → 403"""
token, _ = _create_user(client, "webhook_freedelete@test.com", plan="free")
resp = client.delete("/api/v1/user/webhook", headers=_auth_header(token))
assert resp.status_code == 403
# ─── Tests: dispatch_webhook ──────────────────────────────────────────────────
class TestDispatchWebhook:
def test_dispatch_no_webhook_configured(self):
"""dispatch_webhook silently returns when no webhook is configured"""
with patch("api_v1.utils_webhook.get_db") as mock_get_db:
mock_conn = MagicMock()
mock_conn.execute.return_value.fetchone.return_value = None
mock_get_db.return_value = mock_conn
from api_v1.utils_webhook import dispatch_webhook
# Should not raise, should return silently
dispatch_webhook("nonexistent_user", "new_prediction", {"data": "test"})
def test_dispatch_sends_hmac_header(self):
"""dispatch_webhook sends correct HMAC-SHA256 signature header"""
test_secret = "testsecret"
test_url = "https://hook.example.com/receive"
test_payload = {"race_id": "R123", "top1": "Cheval Blanc"}
with (
patch("api_v1.utils_webhook.get_db") as mock_get_db,
patch("api_v1.utils_webhook.requests.post") as mock_post,
):
mock_row = MagicMock()
mock_row.__getitem__ = lambda self, key: (
test_url if key == "url" else test_secret
)
mock_conn = MagicMock()
mock_conn.execute.return_value.fetchone.return_value = mock_row
mock_get_db.return_value = mock_conn
mock_response = MagicMock()
mock_response.status_code = 200
mock_post.return_value = mock_response
from api_v1.utils_webhook import dispatch_webhook, EVENT_NEW_PREDICTION
dispatch_webhook("user123", EVENT_NEW_PREDICTION, test_payload)
assert mock_post.called
call_kwargs = mock_post.call_args
headers_sent = call_kwargs.kwargs.get("headers") or call_kwargs[1].get(
"headers"
)
assert "X-Turf-Signature" in headers_sent
assert headers_sent["X-Turf-Signature"].startswith("sha256=")
assert headers_sent["X-Turf-Event"] == EVENT_NEW_PREDICTION