Files
turf_saas/saas_auth.py
DevOps Engineer f300e44c74 feat(HRT-80): API Token personnel + Webhook alertes (Pro)
- Nouveaux fichiers: api_tokens_db.py, api_v1/routes/user_tokens.py, api_v1/utils_webhook.py
- Migration DB idempotente: tables user_api_tokens + user_webhooks
- Endpoints POST/DELETE /api/v1/user/api-token (Pro only)
- Endpoints POST/DELETE /api/v1/user/webhook (Pro only, HTTPS requis)
- HMAC-SHA256 fire-and-forget dispatch webhook
- auth.py: validate_api_key() + X-API-Key fallback dans jwt_required_middleware
- saas_auth.py: import logging au niveau module, validate_api_key(), X-API-Key fallback
- api_v1/__init__.py: enregistrement user_tokens_bp
- 24 tests pytest — tous passent

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-29 17:25:30 +02:00

546 lines
17 KiB
Python

#!/usr/bin/env python3
"""
SaaS Auth Blueprint — /api/v1/auth/*
Gestion des utilisateurs, JWT, plans, préférences.
Sprint 4-5 — HRT-30
"""
from flask import Blueprint, request, jsonify, current_app
import sqlite3
import hashlib
import logging
import secrets
import os
import time
import json
from functools import wraps
from datetime import datetime
from collections import defaultdict
from threading import Lock
# ─── Rate limiting login ───────────────────────────────────────────────────────
_login_attempts: dict = defaultdict(
lambda: {"count": 0, "window_start": 0.0, "blocked_until": 0.0}
)
_login_lock = Lock()
LOGIN_RATE_MAX = 5 # max tentatives par fenêtre
LOGIN_RATE_WINDOW = 300 # 5 minutes (en secondes)
LOGIN_BLOCK_DURATION = 900 # 15 min de blocage après dépassement
# ─── Blacklist mots de passe faibles ─────────────────────────────────────────
# HRT-63 — Validation mots de passe faibles
WEAK_PASSWORDS = {
"password",
"password1",
"password123",
"passw0rd",
"12345678",
"123456789",
"1234567890",
"123456",
"12345",
"1234",
"qwerty",
"qwerty123",
"qwertyuiop",
"azerty",
"azertyuiop",
"letmein",
"letmein1",
"iloveyou",
"iloveyou1",
"admin",
"admin123",
"admin1234",
"administrator",
"welcome",
"welcome1",
"welcome123",
"monkey",
"monkey1",
"dragon",
"dragon1",
"master",
"master1",
"football",
"soccer",
"baseball",
"basketball",
"superman",
"batman",
"starwars",
"starwars1",
"princess",
"princess1",
"sunshine",
"sunshine1",
"shadow",
"shadow1",
"michael",
"michael1",
"jessica",
"jessica1",
"abc123",
"abc1234",
"abcd1234",
"abcdefgh",
"login",
"login123",
"pass",
"pass1234",
"test",
"test1234",
"test123456",
"hello",
"hello123",
"hello1234",
"changeme",
"changeme1",
"secret",
"secret1",
"secret123",
"trustno1",
"zaq1zaq1",
"qazwsx",
"qazwsxedc",
"111111",
"1111111",
"11111111",
"000000",
"00000000",
"123123",
"1231234",
"321321",
"p@ssword",
"p@ssw0rd",
"pa$$word",
"turf",
"turf123",
"cheval",
"cheval123",
"pmu",
"pmu123",
}
def validate_password_strength(password: str):
"""
Valide la complexité d'un mot de passe.
Retourne None si OK, sinon un message d'erreur (str).
Règles :
- 8 caractères minimum
- absent de la blacklist WEAK_PASSWORDS
- au moins 1 chiffre
- au moins 1 lettre
"""
if len(password) < 8:
return "Mot de passe trop court (8 caractères minimum)."
if password.lower() in WEAK_PASSWORDS:
return "Mot de passe trop commun. Choisissez un mot de passe plus sécurisé."
if not any(c.isdigit() for c in password):
return "Le mot de passe doit contenir au moins 1 chiffre."
if not any(c.isalpha() for c in password):
return "Le mot de passe doit contenir au moins 1 lettre."
return None
# ─── Config ───────────────────────────────────────────────────────────────────
DB_PATH = os.environ.get("TURF_SAAS_DB", "/home/h3r7/turf_saas/turf_saas.db")
JWT_SECRET = os.environ.get(
"JWT_SECRET", secrets.token_hex(32)
) # persist in env for prod
TOKEN_TTL = int(os.environ.get("JWT_TTL_SECONDS", 30 * 24 * 3600)) # 30 days
auth_bp = Blueprint("auth_v1", __name__, url_prefix="/api/v1/auth")
# ─── DB helpers ───────────────────────────────────────────────────────────────
def get_db():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def init_users_table():
"""Ensure users table exists."""
conn = get_db()
conn.executescript("""
CREATE TABLE IF NOT EXISTS saas_users (
id TEXT PRIMARY KEY,
email TEXT UNIQUE NOT NULL,
firstname TEXT DEFAULT '',
lastname TEXT DEFAULT '',
password_hash TEXT NOT NULL,
plan TEXT DEFAULT 'free',
telegram_chat_id TEXT DEFAULT NULL,
alert_value_bets INTEGER DEFAULT 1,
alert_top1 INTEGER DEFAULT 1,
alert_quinte_only INTEGER DEFAULT 0,
created_at TEXT DEFAULT (datetime('now')),
updated_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS saas_tokens (
token TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
expires_at INTEGER NOT NULL,
created_at TEXT DEFAULT (datetime('now'))
);
""")
conn.commit()
conn.close()
try:
init_users_table()
except Exception as e:
print(f"[auth_bp] DB init warning: {e}")
# ─── Token helpers ────────────────────────────────────────────────────────────
def generate_token(user_id: str) -> str:
token = secrets.token_urlsafe(48)
expires = int(time.time()) + TOKEN_TTL
conn = get_db()
conn.execute(
"INSERT INTO saas_tokens (token, user_id, expires_at) VALUES (?,?,?)",
(token, user_id, expires),
)
conn.commit()
conn.close()
return token
def validate_token(token: str):
"""Returns user row dict or None."""
if not token:
return None
conn = get_db()
now = int(time.time())
row = conn.execute(
"SELECT t.user_id, u.* FROM saas_tokens t JOIN saas_users u ON t.user_id=u.id "
"WHERE t.token=? AND t.expires_at>?",
(token, now),
).fetchone()
conn.close()
return dict(row) if row else None
def hash_password(password: str) -> str:
return hashlib.sha256(password.encode("utf-8")).hexdigest()
def validate_api_key(raw_key: str):
"""
Validate a personal API token (X-API-Key header).
Returns user dict or None. Updates last_used_at on success.
HRT-80
"""
if not raw_key:
return None
key_hash = hashlib.sha256(raw_key.encode()).hexdigest()
conn = get_db()
try:
row = conn.execute(
"SELECT t.user_id, u.* FROM user_api_tokens t "
"JOIN saas_users u ON t.user_id = u.id "
"WHERE t.token_hash = ? AND t.revoked = 0",
(key_hash,),
).fetchone()
if row:
conn.execute(
"UPDATE user_api_tokens SET last_used_at = datetime('now') "
"WHERE token_hash = ?",
(key_hash,),
)
conn.commit()
return dict(row) if row else None
except Exception as e:
logging.getLogger("turf_saas.auth").warning("validate_api_key error: %s", e)
return None
finally:
conn.close()
def require_auth(f):
@wraps(f)
def decorated(*args, **kwargs):
# 1. Try Bearer session token (existing flow — unchanged)
auth = request.headers.get("Authorization", "")
token = (
auth.removeprefix("Bearer ").strip() if auth.startswith("Bearer ") else None
)
user = validate_token(token) if token else None
# 2. Fallback: X-API-Key personal token (HRT-80)
if not user:
api_key = request.headers.get("X-API-Key", "").strip()
if api_key:
user = validate_api_key(api_key)
if not user:
return jsonify({"error": "Non authentifié"}), 401
request.current_user = user
return f(*args, **kwargs)
return decorated
def user_to_dict(user) -> dict:
if isinstance(user, sqlite3.Row):
user = dict(user)
return {
"id": user.get("id"),
"email": user.get("email"),
"firstname": user.get("firstname", ""),
"lastname": user.get("lastname", ""),
"plan": user.get("plan", "free"),
"telegram_chat_id": user.get("telegram_chat_id"),
"alert_value_bets": bool(user.get("alert_value_bets", 1)),
"alert_top1": bool(user.get("alert_top1", 1)),
"alert_quinte_only": bool(user.get("alert_quinte_only", 0)),
"created_at": user.get("created_at"),
}
# ─── Routes ───────────────────────────────────────────────────────────────────
@auth_bp.route("/register", methods=["POST"])
def register():
"""POST /api/v1/auth/register"""
data = request.get_json(silent=True) or {}
email = (data.get("email") or "").strip().lower()
password = data.get("password") or ""
firstname = (data.get("firstname") or "").strip()
lastname = (data.get("lastname") or "").strip()
plan = data.get("plan", "free")
if not email or "@" not in email:
return jsonify({"error": "Adresse email invalide."}), 400
pwd_error = validate_password_strength(password)
if pwd_error:
return jsonify({"error": pwd_error}), 400
if plan not in ("free", "premium", "pro"):
plan = "free"
uid = secrets.token_hex(16)
pw_hash = hash_password(password)
conn = get_db()
try:
conn.execute(
"INSERT INTO saas_users (id, email, firstname, lastname, password_hash, plan) VALUES (?,?,?,?,?,?)",
(uid, email, firstname, lastname, pw_hash, plan),
)
conn.commit()
except sqlite3.IntegrityError:
conn.close()
return jsonify({"error": "Cette adresse email est déjà utilisée."}), 409
conn.close()
token = generate_token(uid)
user_row = validate_token(token)
return jsonify({"token": token, "user": user_to_dict(user_row)}), 201
@auth_bp.route("/login", methods=["POST"])
def login():
"""POST /api/v1/auth/login"""
data = request.get_json(silent=True) or {}
email = (data.get("email") or "").strip().lower()
password = data.get("password") or ""
if not email or not password:
return jsonify({"error": "Email et mot de passe requis."}), 400
# ── Rate limit par IP ────────────────────────────────────────
ip = request.remote_addr or "unknown"
now = time.time()
with _login_lock:
bucket = _login_attempts[ip]
# Lever le blocage si la durée est écoulée
if now >= bucket["blocked_until"]:
if now - bucket["window_start"] >= LOGIN_RATE_WINDOW:
bucket["count"] = 0
bucket["window_start"] = now
bucket["count"] += 1
count = bucket["count"]
if count > LOGIN_RATE_MAX:
bucket["blocked_until"] = now + LOGIN_BLOCK_DURATION
retry_after = LOGIN_BLOCK_DURATION
blocked = True
else:
retry_after = int(LOGIN_RATE_WINDOW - (now - bucket["window_start"]))
blocked = False
else:
blocked = True
retry_after = int(bucket["blocked_until"] - now)
if blocked:
resp = jsonify({"error": "Trop de tentatives. Réessayez plus tard."})
resp.status_code = 429
resp.headers["Retry-After"] = str(retry_after)
return resp
# ─────────────────────────────────────────────────────────────
pw_hash = hash_password(password)
conn = get_db()
user = conn.execute(
"SELECT * FROM saas_users WHERE email=? AND password_hash=?", (email, pw_hash)
).fetchone()
conn.close()
if not user:
return jsonify({"error": "Identifiants incorrects."}), 401
token = generate_token(user["id"])
return jsonify({"token": token, "user": user_to_dict(user)}), 200
@auth_bp.route("/me", methods=["GET"])
@require_auth
def me():
"""GET /api/v1/auth/me"""
return jsonify({"user": user_to_dict(request.current_user)}), 200
@auth_bp.route("/update-profile", methods=["POST"])
@require_auth
def update_profile():
"""POST /api/v1/auth/update-profile"""
data = request.get_json(silent=True) or {}
uid = request.current_user["id"]
fields = {}
if "firstname" in data:
fields["firstname"] = data["firstname"].strip()
if "lastname" in data:
fields["lastname"] = data["lastname"].strip()
if "email" in data:
email = data["email"].strip().lower()
if "@" not in email:
return jsonify({"error": "Email invalide."}), 400
fields["email"] = email
if not fields:
return jsonify({"ok": True}), 200
set_clause = ", ".join(f"{k}=?" for k in fields)
values = list(fields.values()) + [datetime.utcnow().isoformat(), uid]
conn = get_db()
try:
conn.execute(
f"UPDATE saas_users SET {set_clause}, updated_at=? WHERE id=?", values
)
conn.commit()
except sqlite3.IntegrityError:
conn.close()
return jsonify({"error": "Cet email est déjà utilisé."}), 409
conn.close()
return jsonify({"ok": True}), 200
@auth_bp.route("/change-password", methods=["POST"])
@require_auth
def change_password():
"""POST /api/v1/auth/change-password"""
data = request.get_json(silent=True) or {}
uid = request.current_user["id"]
cur_pwd = data.get("current_password") or ""
new_pwd = data.get("new_password") or ""
pwd_error = validate_password_strength(new_pwd)
if pwd_error:
return jsonify({"error": pwd_error}), 400
conn = get_db()
user = conn.execute(
"SELECT * FROM saas_users WHERE id=? AND password_hash=?",
(uid, hash_password(cur_pwd)),
).fetchone()
if not user:
conn.close()
return jsonify({"error": "Mot de passe actuel incorrect."}), 401
conn.execute(
"UPDATE saas_users SET password_hash=?, updated_at=? WHERE id=?",
(hash_password(new_pwd), datetime.utcnow().isoformat(), uid),
)
conn.commit()
conn.close()
return jsonify({"ok": True}), 200
@auth_bp.route("/update-plan", methods=["POST"])
@require_auth
def update_plan():
"""POST /api/v1/auth/update-plan"""
data = request.get_json(silent=True) or {}
plan = data.get("plan", "free")
if plan not in ("free", "premium", "pro"):
return jsonify({"error": "Plan invalide."}), 400
uid = request.current_user["id"]
conn = get_db()
conn.execute(
"UPDATE saas_users SET plan=?, updated_at=? WHERE id=?",
(plan, datetime.utcnow().isoformat(), uid),
)
conn.commit()
conn.close()
return jsonify({"ok": True, "plan": plan}), 200
@auth_bp.route("/update-preferences", methods=["POST"])
@require_auth
def update_preferences():
"""POST /api/v1/auth/update-preferences"""
data = request.get_json(silent=True) or {}
uid = request.current_user["id"]
fields = {}
if "telegram_chat_id" in data:
fields["telegram_chat_id"] = data["telegram_chat_id"] or None
if "alert_value_bets" in data:
fields["alert_value_bets"] = 1 if data["alert_value_bets"] else 0
if "alert_top1" in data:
fields["alert_top1"] = 1 if data["alert_top1"] else 0
if "alert_quinte_only" in data:
fields["alert_quinte_only"] = 1 if data["alert_quinte_only"] else 0
if fields:
set_clause = ", ".join(f"{k}=?" for k in fields)
values = list(fields.values()) + [datetime.utcnow().isoformat(), uid]
conn = get_db()
conn.execute(
f"UPDATE saas_users SET {set_clause}, updated_at=? WHERE id=?", values
)
conn.commit()
conn.close()
return jsonify({"ok": True}), 200
@auth_bp.route("/logout", methods=["POST"])
@require_auth
def logout():
"""POST /api/v1/auth/logout"""
auth = request.headers.get("Authorization", "")
token = auth.removeprefix("Bearer ").strip()
conn = get_db()
conn.execute("DELETE FROM saas_tokens WHERE token=?", (token,))
conn.commit()
conn.close()
return jsonify({"ok": True}), 200
@auth_bp.route("/delete-account", methods=["DELETE"])
@require_auth
def delete_account():
"""DELETE /api/v1/auth/delete-account"""
uid = request.current_user["id"]
conn = get_db()
conn.execute("DELETE FROM saas_tokens WHERE user_id=?", (uid,))
conn.execute("DELETE FROM saas_users WHERE id=?", (uid,))
conn.commit()
conn.close()
return jsonify({"ok": True}), 200