#!/usr/bin/env python3 """ SaaS Auth Blueprint — /api/v1/auth/* Gestion des utilisateurs, JWT, plans, préférences. Sprint 4-5 — HRT-30 """ from flask import Blueprint, request, jsonify, current_app import sqlite3 import hashlib import logging import secrets import os import time import json from functools import wraps from datetime import datetime from collections import defaultdict from threading import Lock # ─── Rate limiting login ─────────────────────────────────────────────────────── _login_attempts: dict = defaultdict( lambda: {"count": 0, "window_start": 0.0, "blocked_until": 0.0} ) _login_lock = Lock() LOGIN_RATE_MAX = 5 # max tentatives par fenêtre LOGIN_RATE_WINDOW = 300 # 5 minutes (en secondes) LOGIN_BLOCK_DURATION = 900 # 15 min de blocage après dépassement # ─── Blacklist mots de passe faibles ───────────────────────────────────────── # HRT-63 — Validation mots de passe faibles WEAK_PASSWORDS = { "password", "password1", "password123", "passw0rd", "12345678", "123456789", "1234567890", "123456", "12345", "1234", "qwerty", "qwerty123", "qwertyuiop", "azerty", "azertyuiop", "letmein", "letmein1", "iloveyou", "iloveyou1", "admin", "admin123", "admin1234", "administrator", "welcome", "welcome1", "welcome123", "monkey", "monkey1", "dragon", "dragon1", "master", "master1", "football", "soccer", "baseball", "basketball", "superman", "batman", "starwars", "starwars1", "princess", "princess1", "sunshine", "sunshine1", "shadow", "shadow1", "michael", "michael1", "jessica", "jessica1", "abc123", "abc1234", "abcd1234", "abcdefgh", "login", "login123", "pass", "pass1234", "test", "test1234", "test123456", "hello", "hello123", "hello1234", "changeme", "changeme1", "secret", "secret1", "secret123", "trustno1", "zaq1zaq1", "qazwsx", "qazwsxedc", "111111", "1111111", "11111111", "000000", "00000000", "123123", "1231234", "321321", "p@ssword", "p@ssw0rd", "pa$$word", "turf", "turf123", "cheval", "cheval123", "pmu", "pmu123", } def validate_password_strength(password: str): """ Valide la complexité d'un mot de passe. Retourne None si OK, sinon un message d'erreur (str). Règles : - 8 caractères minimum - absent de la blacklist WEAK_PASSWORDS - au moins 1 chiffre - au moins 1 lettre """ if len(password) < 8: return "Mot de passe trop court (8 caractères minimum)." if password.lower() in WEAK_PASSWORDS: return "Mot de passe trop commun. Choisissez un mot de passe plus sécurisé." if not any(c.isdigit() for c in password): return "Le mot de passe doit contenir au moins 1 chiffre." if not any(c.isalpha() for c in password): return "Le mot de passe doit contenir au moins 1 lettre." return None # ─── Config ─────────────────────────────────────────────────────────────────── DB_PATH = os.environ.get("TURF_SAAS_DB", "/home/h3r7/turf_saas/turf_saas.db") JWT_SECRET = os.environ.get( "JWT_SECRET", secrets.token_hex(32) ) # persist in env for prod TOKEN_TTL = int(os.environ.get("JWT_TTL_SECONDS", 30 * 24 * 3600)) # 30 days auth_bp = Blueprint("auth_v1", __name__, url_prefix="/api/v1/auth") # ─── DB helpers ─────────────────────────────────────────────────────────────── def get_db(): conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row return conn def init_users_table(): """Ensure users table exists.""" conn = get_db() conn.executescript(""" CREATE TABLE IF NOT EXISTS saas_users ( id TEXT PRIMARY KEY, email TEXT UNIQUE NOT NULL, firstname TEXT DEFAULT '', lastname TEXT DEFAULT '', password_hash TEXT NOT NULL, plan TEXT DEFAULT 'free', telegram_chat_id TEXT DEFAULT NULL, alert_value_bets INTEGER DEFAULT 1, alert_top1 INTEGER DEFAULT 1, alert_quinte_only INTEGER DEFAULT 0, created_at TEXT DEFAULT (datetime('now')), updated_at TEXT DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS saas_tokens ( token TEXT PRIMARY KEY, user_id TEXT NOT NULL, expires_at INTEGER NOT NULL, created_at TEXT DEFAULT (datetime('now')) ); """) conn.commit() conn.close() try: init_users_table() except Exception as e: print(f"[auth_bp] DB init warning: {e}") # ─── Token helpers ──────────────────────────────────────────────────────────── def generate_token(user_id: str) -> str: token = secrets.token_urlsafe(48) expires = int(time.time()) + TOKEN_TTL conn = get_db() conn.execute( "INSERT INTO saas_tokens (token, user_id, expires_at) VALUES (?,?,?)", (token, user_id, expires), ) conn.commit() conn.close() return token def validate_token(token: str): """Returns user row dict or None.""" if not token: return None conn = get_db() now = int(time.time()) row = conn.execute( "SELECT t.user_id, u.* FROM saas_tokens t JOIN saas_users u ON t.user_id=u.id " "WHERE t.token=? AND t.expires_at>?", (token, now), ).fetchone() conn.close() return dict(row) if row else None def hash_password(password: str) -> str: return hashlib.sha256(password.encode("utf-8")).hexdigest() def validate_api_key(raw_key: str): """ Validate a personal API token (X-API-Key header). Returns user dict or None. Updates last_used_at on success. HRT-80 """ if not raw_key: return None key_hash = hashlib.sha256(raw_key.encode()).hexdigest() conn = get_db() try: row = conn.execute( "SELECT t.user_id, u.* FROM user_api_tokens t " "JOIN saas_users u ON t.user_id = u.id " "WHERE t.token_hash = ? AND t.revoked = 0", (key_hash,), ).fetchone() if row: conn.execute( "UPDATE user_api_tokens SET last_used_at = datetime('now') " "WHERE token_hash = ?", (key_hash,), ) conn.commit() return dict(row) if row else None except Exception as e: logging.getLogger("turf_saas.auth").warning("validate_api_key error: %s", e) return None finally: conn.close() def require_auth(f): @wraps(f) def decorated(*args, **kwargs): # 1. Try Bearer session token (existing flow — unchanged) auth = request.headers.get("Authorization", "") token = ( auth.removeprefix("Bearer ").strip() if auth.startswith("Bearer ") else None ) user = validate_token(token) if token else None # 2. Fallback: X-API-Key personal token (HRT-80) if not user: api_key = request.headers.get("X-API-Key", "").strip() if api_key: user = validate_api_key(api_key) if not user: return jsonify({"error": "Non authentifié"}), 401 request.current_user = user return f(*args, **kwargs) return decorated def user_to_dict(user) -> dict: if isinstance(user, sqlite3.Row): user = dict(user) return { "id": user.get("id"), "email": user.get("email"), "firstname": user.get("firstname", ""), "lastname": user.get("lastname", ""), "plan": user.get("plan", "free"), "telegram_chat_id": user.get("telegram_chat_id"), "alert_value_bets": bool(user.get("alert_value_bets", 1)), "alert_top1": bool(user.get("alert_top1", 1)), "alert_quinte_only": bool(user.get("alert_quinte_only", 0)), "created_at": user.get("created_at"), } # ─── Routes ─────────────────────────────────────────────────────────────────── @auth_bp.route("/register", methods=["POST"]) def register(): """POST /api/v1/auth/register""" data = request.get_json(silent=True) or {} email = (data.get("email") or "").strip().lower() password = data.get("password") or "" firstname = (data.get("firstname") or "").strip() lastname = (data.get("lastname") or "").strip() plan = data.get("plan", "free") if not email or "@" not in email: return jsonify({"error": "Adresse email invalide."}), 400 pwd_error = validate_password_strength(password) if pwd_error: return jsonify({"error": pwd_error}), 400 if plan not in ("free", "premium", "pro"): plan = "free" uid = secrets.token_hex(16) pw_hash = hash_password(password) conn = get_db() try: conn.execute( "INSERT INTO saas_users (id, email, firstname, lastname, password_hash, plan) VALUES (?,?,?,?,?,?)", (uid, email, firstname, lastname, pw_hash, plan), ) conn.commit() except sqlite3.IntegrityError: conn.close() return jsonify({"error": "Cette adresse email est déjà utilisée."}), 409 conn.close() token = generate_token(uid) user_row = validate_token(token) return jsonify({"token": token, "user": user_to_dict(user_row)}), 201 @auth_bp.route("/login", methods=["POST"]) def login(): """POST /api/v1/auth/login""" data = request.get_json(silent=True) or {} email = (data.get("email") or "").strip().lower() password = data.get("password") or "" if not email or not password: return jsonify({"error": "Email et mot de passe requis."}), 400 # ── Rate limit par IP ──────────────────────────────────────── ip = request.remote_addr or "unknown" now = time.time() with _login_lock: bucket = _login_attempts[ip] # Lever le blocage si la durée est écoulée if now >= bucket["blocked_until"]: if now - bucket["window_start"] >= LOGIN_RATE_WINDOW: bucket["count"] = 0 bucket["window_start"] = now bucket["count"] += 1 count = bucket["count"] if count > LOGIN_RATE_MAX: bucket["blocked_until"] = now + LOGIN_BLOCK_DURATION retry_after = LOGIN_BLOCK_DURATION blocked = True else: retry_after = int(LOGIN_RATE_WINDOW - (now - bucket["window_start"])) blocked = False else: blocked = True retry_after = int(bucket["blocked_until"] - now) if blocked: resp = jsonify({"error": "Trop de tentatives. Réessayez plus tard."}) resp.status_code = 429 resp.headers["Retry-After"] = str(retry_after) return resp # ───────────────────────────────────────────────────────────── pw_hash = hash_password(password) conn = get_db() user = conn.execute( "SELECT * FROM saas_users WHERE email=? AND password_hash=?", (email, pw_hash) ).fetchone() conn.close() if not user: return jsonify({"error": "Identifiants incorrects."}), 401 token = generate_token(user["id"]) return jsonify({"token": token, "user": user_to_dict(user)}), 200 @auth_bp.route("/me", methods=["GET"]) @require_auth def me(): """GET /api/v1/auth/me""" return jsonify({"user": user_to_dict(request.current_user)}), 200 @auth_bp.route("/update-profile", methods=["POST"]) @require_auth def update_profile(): """POST /api/v1/auth/update-profile""" data = request.get_json(silent=True) or {} uid = request.current_user["id"] fields = {} if "firstname" in data: fields["firstname"] = data["firstname"].strip() if "lastname" in data: fields["lastname"] = data["lastname"].strip() if "email" in data: email = data["email"].strip().lower() if "@" not in email: return jsonify({"error": "Email invalide."}), 400 fields["email"] = email if not fields: return jsonify({"ok": True}), 200 set_clause = ", ".join(f"{k}=?" for k in fields) values = list(fields.values()) + [datetime.utcnow().isoformat(), uid] conn = get_db() try: conn.execute( f"UPDATE saas_users SET {set_clause}, updated_at=? WHERE id=?", values ) conn.commit() except sqlite3.IntegrityError: conn.close() return jsonify({"error": "Cet email est déjà utilisé."}), 409 conn.close() return jsonify({"ok": True}), 200 @auth_bp.route("/change-password", methods=["POST"]) @require_auth def change_password(): """POST /api/v1/auth/change-password""" data = request.get_json(silent=True) or {} uid = request.current_user["id"] cur_pwd = data.get("current_password") or "" new_pwd = data.get("new_password") or "" pwd_error = validate_password_strength(new_pwd) if pwd_error: return jsonify({"error": pwd_error}), 400 conn = get_db() user = conn.execute( "SELECT * FROM saas_users WHERE id=? AND password_hash=?", (uid, hash_password(cur_pwd)), ).fetchone() if not user: conn.close() return jsonify({"error": "Mot de passe actuel incorrect."}), 401 conn.execute( "UPDATE saas_users SET password_hash=?, updated_at=? WHERE id=?", (hash_password(new_pwd), datetime.utcnow().isoformat(), uid), ) conn.commit() conn.close() return jsonify({"ok": True}), 200 @auth_bp.route("/update-plan", methods=["POST"]) @require_auth def update_plan(): """POST /api/v1/auth/update-plan""" data = request.get_json(silent=True) or {} plan = data.get("plan", "free") if plan not in ("free", "premium", "pro"): return jsonify({"error": "Plan invalide."}), 400 uid = request.current_user["id"] conn = get_db() conn.execute( "UPDATE saas_users SET plan=?, updated_at=? WHERE id=?", (plan, datetime.utcnow().isoformat(), uid), ) conn.commit() conn.close() return jsonify({"ok": True, "plan": plan}), 200 @auth_bp.route("/update-preferences", methods=["POST"]) @require_auth def update_preferences(): """POST /api/v1/auth/update-preferences""" data = request.get_json(silent=True) or {} uid = request.current_user["id"] fields = {} if "telegram_chat_id" in data: fields["telegram_chat_id"] = data["telegram_chat_id"] or None if "alert_value_bets" in data: fields["alert_value_bets"] = 1 if data["alert_value_bets"] else 0 if "alert_top1" in data: fields["alert_top1"] = 1 if data["alert_top1"] else 0 if "alert_quinte_only" in data: fields["alert_quinte_only"] = 1 if data["alert_quinte_only"] else 0 if fields: set_clause = ", ".join(f"{k}=?" for k in fields) values = list(fields.values()) + [datetime.utcnow().isoformat(), uid] conn = get_db() conn.execute( f"UPDATE saas_users SET {set_clause}, updated_at=? WHERE id=?", values ) conn.commit() conn.close() return jsonify({"ok": True}), 200 @auth_bp.route("/logout", methods=["POST"]) @require_auth def logout(): """POST /api/v1/auth/logout""" auth = request.headers.get("Authorization", "") token = auth.removeprefix("Bearer ").strip() conn = get_db() conn.execute("DELETE FROM saas_tokens WHERE token=?", (token,)) conn.commit() conn.close() return jsonify({"ok": True}), 200 @auth_bp.route("/delete-account", methods=["DELETE"]) @require_auth def delete_account(): """DELETE /api/v1/auth/delete-account""" uid = request.current_user["id"] conn = get_db() conn.execute("DELETE FROM saas_tokens WHERE user_id=?", (uid,)) conn.execute("DELETE FROM saas_users WHERE id=?", (uid,)) conn.commit() conn.close() return jsonify({"ok": True}), 200