#!/usr/bin/env python3 """ SaaS Auth Blueprint — /api/v1/auth/* Gestion des utilisateurs, JWT, plans, préférences. Sprint 4-5 — HRT-30 """ from flask import Blueprint, request, jsonify, current_app import sqlite3 import hashlib import secrets import os import time import json from functools import wraps from datetime import datetime from collections import defaultdict from threading import Lock # ─── Rate limiting login ─────────────────────────────────────────────────────── _login_attempts: dict = defaultdict( lambda: {"count": 0, "window_start": 0.0, "blocked_until": 0.0} ) _login_lock = Lock() LOGIN_RATE_MAX = 5 # max tentatives par fenêtre LOGIN_RATE_WINDOW = 300 # 5 minutes (en secondes) LOGIN_BLOCK_DURATION = 900 # 15 min de blocage après dépassement # ─── Config ─────────────────────────────────────────────────────────────────── DB_PATH = os.environ.get("TURF_SAAS_DB", "/home/h3r7/turf_saas/turf_saas.db") JWT_SECRET = os.environ.get( "JWT_SECRET", secrets.token_hex(32) ) # persist in env for prod TOKEN_TTL = int(os.environ.get("JWT_TTL_SECONDS", 30 * 24 * 3600)) # 30 days auth_bp = Blueprint("auth_v1", __name__, url_prefix="/api/v1/auth") # ─── DB helpers ─────────────────────────────────────────────────────────────── def get_db(): conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row return conn def init_users_table(): """Ensure users table exists.""" conn = get_db() conn.executescript(""" CREATE TABLE IF NOT EXISTS saas_users ( id TEXT PRIMARY KEY, email TEXT UNIQUE NOT NULL, firstname TEXT DEFAULT '', lastname TEXT DEFAULT '', password_hash TEXT NOT NULL, plan TEXT DEFAULT 'free', telegram_chat_id TEXT DEFAULT NULL, alert_value_bets INTEGER DEFAULT 1, alert_top1 INTEGER DEFAULT 1, alert_quinte_only INTEGER DEFAULT 0, created_at TEXT DEFAULT (datetime('now')), updated_at TEXT DEFAULT (datetime('now')) ); CREATE TABLE IF NOT EXISTS saas_tokens ( token TEXT PRIMARY KEY, user_id TEXT NOT NULL, expires_at INTEGER NOT NULL, created_at TEXT DEFAULT (datetime('now')) ); """) conn.commit() conn.close() try: init_users_table() except Exception as e: print(f"[auth_bp] DB init warning: {e}") # ─── Token helpers ──────────────────────────────────────────────────────────── def generate_token(user_id: str) -> str: token = secrets.token_urlsafe(48) expires = int(time.time()) + TOKEN_TTL conn = get_db() conn.execute( "INSERT INTO saas_tokens (token, user_id, expires_at) VALUES (?,?,?)", (token, user_id, expires), ) conn.commit() conn.close() return token def validate_token(token: str): """Returns user row dict or None.""" if not token: return None conn = get_db() now = int(time.time()) row = conn.execute( "SELECT t.user_id, u.* FROM saas_tokens t JOIN saas_users u ON t.user_id=u.id " "WHERE t.token=? AND t.expires_at>?", (token, now), ).fetchone() conn.close() return dict(row) if row else None def hash_password(password: str) -> str: return hashlib.sha256(password.encode("utf-8")).hexdigest() def require_auth(f): @wraps(f) def decorated(*args, **kwargs): auth = request.headers.get("Authorization", "") token = ( auth.removeprefix("Bearer ").strip() if auth.startswith("Bearer ") else None ) user = validate_token(token) if not user: return jsonify({"error": "Non authentifié"}), 401 request.current_user = user return f(*args, **kwargs) return decorated def user_to_dict(user) -> dict: if isinstance(user, sqlite3.Row): user = dict(user) return { "id": user.get("id"), "email": user.get("email"), "firstname": user.get("firstname", ""), "lastname": user.get("lastname", ""), "plan": user.get("plan", "free"), "telegram_chat_id": user.get("telegram_chat_id"), "alert_value_bets": bool(user.get("alert_value_bets", 1)), "alert_top1": bool(user.get("alert_top1", 1)), "alert_quinte_only": bool(user.get("alert_quinte_only", 0)), "created_at": user.get("created_at"), } # ─── Routes ─────────────────────────────────────────────────────────────────── @auth_bp.route("/register", methods=["POST"]) def register(): """POST /api/v1/auth/register""" data = request.get_json(silent=True) or {} email = (data.get("email") or "").strip().lower() password = data.get("password") or "" firstname = (data.get("firstname") or "").strip() lastname = (data.get("lastname") or "").strip() plan = data.get("plan", "free") if not email or "@" not in email: return jsonify({"error": "Adresse email invalide."}), 400 if len(password) < 8: return jsonify( {"error": "Mot de passe trop court (8 caractères minimum)."} ), 400 if plan not in ("free", "premium", "pro"): plan = "free" uid = secrets.token_hex(16) pw_hash = hash_password(password) conn = get_db() try: conn.execute( "INSERT INTO saas_users (id, email, firstname, lastname, password_hash, plan) VALUES (?,?,?,?,?,?)", (uid, email, firstname, lastname, pw_hash, plan), ) conn.commit() except sqlite3.IntegrityError: conn.close() return jsonify({"error": "Cette adresse email est déjà utilisée."}), 409 conn.close() token = generate_token(uid) user_row = validate_token(token) return jsonify({"token": token, "user": user_to_dict(user_row)}), 201 @auth_bp.route("/login", methods=["POST"]) def login(): """POST /api/v1/auth/login""" data = request.get_json(silent=True) or {} email = (data.get("email") or "").strip().lower() password = data.get("password") or "" if not email or not password: return jsonify({"error": "Email et mot de passe requis."}), 400 # ── Rate limit par IP ──────────────────────────────────────── ip = request.remote_addr or "unknown" now = time.time() with _login_lock: bucket = _login_attempts[ip] # Lever le blocage si la durée est écoulée if now >= bucket["blocked_until"]: if now - bucket["window_start"] >= LOGIN_RATE_WINDOW: bucket["count"] = 0 bucket["window_start"] = now bucket["count"] += 1 count = bucket["count"] if count > LOGIN_RATE_MAX: bucket["blocked_until"] = now + LOGIN_BLOCK_DURATION retry_after = LOGIN_BLOCK_DURATION blocked = True else: retry_after = int(LOGIN_RATE_WINDOW - (now - bucket["window_start"])) blocked = False else: blocked = True retry_after = int(bucket["blocked_until"] - now) if blocked: resp = jsonify({"error": "Trop de tentatives. Réessayez plus tard."}) resp.status_code = 429 resp.headers["Retry-After"] = str(retry_after) return resp # ───────────────────────────────────────────────────────────── pw_hash = hash_password(password) conn = get_db() user = conn.execute( "SELECT * FROM saas_users WHERE email=? AND password_hash=?", (email, pw_hash) ).fetchone() conn.close() if not user: return jsonify({"error": "Identifiants incorrects."}), 401 token = generate_token(user["id"]) return jsonify({"token": token, "user": user_to_dict(user)}), 200 @auth_bp.route("/me", methods=["GET"]) @require_auth def me(): """GET /api/v1/auth/me""" return jsonify({"user": user_to_dict(request.current_user)}), 200 @auth_bp.route("/update-profile", methods=["POST"]) @require_auth def update_profile(): """POST /api/v1/auth/update-profile""" data = request.get_json(silent=True) or {} uid = request.current_user["id"] fields = {} if "firstname" in data: fields["firstname"] = data["firstname"].strip() if "lastname" in data: fields["lastname"] = data["lastname"].strip() if "email" in data: email = data["email"].strip().lower() if "@" not in email: return jsonify({"error": "Email invalide."}), 400 fields["email"] = email if not fields: return jsonify({"ok": True}), 200 set_clause = ", ".join(f"{k}=?" for k in fields) values = list(fields.values()) + [datetime.utcnow().isoformat(), uid] conn = get_db() try: conn.execute( f"UPDATE saas_users SET {set_clause}, updated_at=? WHERE id=?", values ) conn.commit() except sqlite3.IntegrityError: conn.close() return jsonify({"error": "Cet email est déjà utilisé."}), 409 conn.close() return jsonify({"ok": True}), 200 @auth_bp.route("/change-password", methods=["POST"]) @require_auth def change_password(): """POST /api/v1/auth/change-password""" data = request.get_json(silent=True) or {} uid = request.current_user["id"] cur_pwd = data.get("current_password") or "" new_pwd = data.get("new_password") or "" if len(new_pwd) < 8: return jsonify({"error": "Nouveau mot de passe trop court."}), 400 conn = get_db() user = conn.execute( "SELECT * FROM saas_users WHERE id=? AND password_hash=?", (uid, hash_password(cur_pwd)), ).fetchone() if not user: conn.close() return jsonify({"error": "Mot de passe actuel incorrect."}), 401 conn.execute( "UPDATE saas_users SET password_hash=?, updated_at=? WHERE id=?", (hash_password(new_pwd), datetime.utcnow().isoformat(), uid), ) conn.commit() conn.close() return jsonify({"ok": True}), 200 @auth_bp.route("/update-plan", methods=["POST"]) @require_auth def update_plan(): """POST /api/v1/auth/update-plan""" data = request.get_json(silent=True) or {} plan = data.get("plan", "free") if plan not in ("free", "premium", "pro"): return jsonify({"error": "Plan invalide."}), 400 uid = request.current_user["id"] conn = get_db() conn.execute( "UPDATE saas_users SET plan=?, updated_at=? WHERE id=?", (plan, datetime.utcnow().isoformat(), uid), ) conn.commit() conn.close() return jsonify({"ok": True, "plan": plan}), 200 @auth_bp.route("/update-preferences", methods=["POST"]) @require_auth def update_preferences(): """POST /api/v1/auth/update-preferences""" data = request.get_json(silent=True) or {} uid = request.current_user["id"] fields = {} if "telegram_chat_id" in data: fields["telegram_chat_id"] = data["telegram_chat_id"] or None if "alert_value_bets" in data: fields["alert_value_bets"] = 1 if data["alert_value_bets"] else 0 if "alert_top1" in data: fields["alert_top1"] = 1 if data["alert_top1"] else 0 if "alert_quinte_only" in data: fields["alert_quinte_only"] = 1 if data["alert_quinte_only"] else 0 if fields: set_clause = ", ".join(f"{k}=?" for k in fields) values = list(fields.values()) + [datetime.utcnow().isoformat(), uid] conn = get_db() conn.execute( f"UPDATE saas_users SET {set_clause}, updated_at=? WHERE id=?", values ) conn.commit() conn.close() return jsonify({"ok": True}), 200 @auth_bp.route("/logout", methods=["POST"]) @require_auth def logout(): """POST /api/v1/auth/logout""" auth = request.headers.get("Authorization", "") token = auth.removeprefix("Bearer ").strip() conn = get_db() conn.execute("DELETE FROM saas_tokens WHERE token=?", (token,)) conn.commit() conn.close() return jsonify({"ok": True}), 200 @auth_bp.route("/delete-account", methods=["DELETE"]) @require_auth def delete_account(): """DELETE /api/v1/auth/delete-account""" uid = request.current_user["id"] conn = get_db() conn.execute("DELETE FROM saas_tokens WHERE user_id=?", (uid,)) conn.execute("DELETE FROM saas_users WHERE id=?", (uid,)) conn.commit() conn.close() return jsonify({"ok": True}), 200