Files
turf_saas/saas_auth.py
CTO H3R7Tech 4bf458f1b8 Merge HRT-62: IP-based rate limiting on /auth/login — validated CTO
- In-memory IP rate limiter: 5 attempts / 5min window
- 15 min block on exceed, HTTP 429 + Retry-After header
- Applied rate_limit_middleware on portal_server.py
- Tests: TestLoginRateLimit added (conflict resolved: keep both test classes)

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-27 15:24:07 +02:00

505 lines
15 KiB
Python

#!/usr/bin/env python3
"""
SaaS Auth Blueprint — /api/v1/auth/*
Gestion des utilisateurs, JWT, plans, préférences.
Sprint 4-5 — HRT-30
"""
from flask import Blueprint, request, jsonify, current_app
import sqlite3
import hashlib
import secrets
import os
import time
import json
from functools import wraps
from datetime import datetime
from collections import defaultdict
from threading import Lock
# ─── Rate limiting login ───────────────────────────────────────────────────────
_login_attempts: dict = defaultdict(
lambda: {"count": 0, "window_start": 0.0, "blocked_until": 0.0}
)
_login_lock = Lock()
LOGIN_RATE_MAX = 5 # max tentatives par fenêtre
LOGIN_RATE_WINDOW = 300 # 5 minutes (en secondes)
LOGIN_BLOCK_DURATION = 900 # 15 min de blocage après dépassement
# ─── Blacklist mots de passe faibles ─────────────────────────────────────────
# HRT-63 — Validation mots de passe faibles
WEAK_PASSWORDS = {
"password",
"password1",
"password123",
"passw0rd",
"12345678",
"123456789",
"1234567890",
"123456",
"12345",
"1234",
"qwerty",
"qwerty123",
"qwertyuiop",
"azerty",
"azertyuiop",
"letmein",
"letmein1",
"iloveyou",
"iloveyou1",
"admin",
"admin123",
"admin1234",
"administrator",
"welcome",
"welcome1",
"welcome123",
"monkey",
"monkey1",
"dragon",
"dragon1",
"master",
"master1",
"football",
"soccer",
"baseball",
"basketball",
"superman",
"batman",
"starwars",
"starwars1",
"princess",
"princess1",
"sunshine",
"sunshine1",
"shadow",
"shadow1",
"michael",
"michael1",
"jessica",
"jessica1",
"abc123",
"abc1234",
"abcd1234",
"abcdefgh",
"login",
"login123",
"pass",
"pass1234",
"test",
"test1234",
"test123456",
"hello",
"hello123",
"hello1234",
"changeme",
"changeme1",
"secret",
"secret1",
"secret123",
"trustno1",
"zaq1zaq1",
"qazwsx",
"qazwsxedc",
"111111",
"1111111",
"11111111",
"000000",
"00000000",
"123123",
"1231234",
"321321",
"p@ssword",
"p@ssw0rd",
"pa$$word",
"turf",
"turf123",
"cheval",
"cheval123",
"pmu",
"pmu123",
}
def validate_password_strength(password: str):
"""
Valide la complexité d'un mot de passe.
Retourne None si OK, sinon un message d'erreur (str).
Règles :
- 8 caractères minimum
- absent de la blacklist WEAK_PASSWORDS
- au moins 1 chiffre
- au moins 1 lettre
"""
if len(password) < 8:
return "Mot de passe trop court (8 caractères minimum)."
if password.lower() in WEAK_PASSWORDS:
return "Mot de passe trop commun. Choisissez un mot de passe plus sécurisé."
if not any(c.isdigit() for c in password):
return "Le mot de passe doit contenir au moins 1 chiffre."
if not any(c.isalpha() for c in password):
return "Le mot de passe doit contenir au moins 1 lettre."
return None
# ─── Config ───────────────────────────────────────────────────────────────────
DB_PATH = os.environ.get("TURF_SAAS_DB", "/home/h3r7/turf_saas/turf_saas.db")
JWT_SECRET = os.environ.get(
"JWT_SECRET", secrets.token_hex(32)
) # persist in env for prod
TOKEN_TTL = int(os.environ.get("JWT_TTL_SECONDS", 30 * 24 * 3600)) # 30 days
auth_bp = Blueprint("auth_v1", __name__, url_prefix="/api/v1/auth")
# ─── DB helpers ───────────────────────────────────────────────────────────────
def get_db():
conn = sqlite3.connect(DB_PATH)
conn.row_factory = sqlite3.Row
return conn
def init_users_table():
"""Ensure users table exists."""
conn = get_db()
conn.executescript("""
CREATE TABLE IF NOT EXISTS saas_users (
id TEXT PRIMARY KEY,
email TEXT UNIQUE NOT NULL,
firstname TEXT DEFAULT '',
lastname TEXT DEFAULT '',
password_hash TEXT NOT NULL,
plan TEXT DEFAULT 'free',
telegram_chat_id TEXT DEFAULT NULL,
alert_value_bets INTEGER DEFAULT 1,
alert_top1 INTEGER DEFAULT 1,
alert_quinte_only INTEGER DEFAULT 0,
created_at TEXT DEFAULT (datetime('now')),
updated_at TEXT DEFAULT (datetime('now'))
);
CREATE TABLE IF NOT EXISTS saas_tokens (
token TEXT PRIMARY KEY,
user_id TEXT NOT NULL,
expires_at INTEGER NOT NULL,
created_at TEXT DEFAULT (datetime('now'))
);
""")
conn.commit()
conn.close()
try:
init_users_table()
except Exception as e:
print(f"[auth_bp] DB init warning: {e}")
# ─── Token helpers ────────────────────────────────────────────────────────────
def generate_token(user_id: str) -> str:
token = secrets.token_urlsafe(48)
expires = int(time.time()) + TOKEN_TTL
conn = get_db()
conn.execute(
"INSERT INTO saas_tokens (token, user_id, expires_at) VALUES (?,?,?)",
(token, user_id, expires),
)
conn.commit()
conn.close()
return token
def validate_token(token: str):
"""Returns user row dict or None."""
if not token:
return None
conn = get_db()
now = int(time.time())
row = conn.execute(
"SELECT t.user_id, u.* FROM saas_tokens t JOIN saas_users u ON t.user_id=u.id "
"WHERE t.token=? AND t.expires_at>?",
(token, now),
).fetchone()
conn.close()
return dict(row) if row else None
def hash_password(password: str) -> str:
return hashlib.sha256(password.encode("utf-8")).hexdigest()
def require_auth(f):
@wraps(f)
def decorated(*args, **kwargs):
auth = request.headers.get("Authorization", "")
token = (
auth.removeprefix("Bearer ").strip() if auth.startswith("Bearer ") else None
)
user = validate_token(token)
if not user:
return jsonify({"error": "Non authentifié"}), 401
request.current_user = user
return f(*args, **kwargs)
return decorated
def user_to_dict(user) -> dict:
if isinstance(user, sqlite3.Row):
user = dict(user)
return {
"id": user.get("id"),
"email": user.get("email"),
"firstname": user.get("firstname", ""),
"lastname": user.get("lastname", ""),
"plan": user.get("plan", "free"),
"telegram_chat_id": user.get("telegram_chat_id"),
"alert_value_bets": bool(user.get("alert_value_bets", 1)),
"alert_top1": bool(user.get("alert_top1", 1)),
"alert_quinte_only": bool(user.get("alert_quinte_only", 0)),
"created_at": user.get("created_at"),
}
# ─── Routes ───────────────────────────────────────────────────────────────────
@auth_bp.route("/register", methods=["POST"])
def register():
"""POST /api/v1/auth/register"""
data = request.get_json(silent=True) or {}
email = (data.get("email") or "").strip().lower()
password = data.get("password") or ""
firstname = (data.get("firstname") or "").strip()
lastname = (data.get("lastname") or "").strip()
plan = data.get("plan", "free")
if not email or "@" not in email:
return jsonify({"error": "Adresse email invalide."}), 400
pwd_error = validate_password_strength(password)
if pwd_error:
return jsonify({"error": pwd_error}), 400
if plan not in ("free", "premium", "pro"):
plan = "free"
uid = secrets.token_hex(16)
pw_hash = hash_password(password)
conn = get_db()
try:
conn.execute(
"INSERT INTO saas_users (id, email, firstname, lastname, password_hash, plan) VALUES (?,?,?,?,?,?)",
(uid, email, firstname, lastname, pw_hash, plan),
)
conn.commit()
except sqlite3.IntegrityError:
conn.close()
return jsonify({"error": "Cette adresse email est déjà utilisée."}), 409
conn.close()
token = generate_token(uid)
user_row = validate_token(token)
return jsonify({"token": token, "user": user_to_dict(user_row)}), 201
@auth_bp.route("/login", methods=["POST"])
def login():
"""POST /api/v1/auth/login"""
data = request.get_json(silent=True) or {}
email = (data.get("email") or "").strip().lower()
password = data.get("password") or ""
if not email or not password:
return jsonify({"error": "Email et mot de passe requis."}), 400
# ── Rate limit par IP ────────────────────────────────────────
ip = request.remote_addr or "unknown"
now = time.time()
with _login_lock:
bucket = _login_attempts[ip]
# Lever le blocage si la durée est écoulée
if now >= bucket["blocked_until"]:
if now - bucket["window_start"] >= LOGIN_RATE_WINDOW:
bucket["count"] = 0
bucket["window_start"] = now
bucket["count"] += 1
count = bucket["count"]
if count > LOGIN_RATE_MAX:
bucket["blocked_until"] = now + LOGIN_BLOCK_DURATION
retry_after = LOGIN_BLOCK_DURATION
blocked = True
else:
retry_after = int(LOGIN_RATE_WINDOW - (now - bucket["window_start"]))
blocked = False
else:
blocked = True
retry_after = int(bucket["blocked_until"] - now)
if blocked:
resp = jsonify({"error": "Trop de tentatives. Réessayez plus tard."})
resp.status_code = 429
resp.headers["Retry-After"] = str(retry_after)
return resp
# ─────────────────────────────────────────────────────────────
pw_hash = hash_password(password)
conn = get_db()
user = conn.execute(
"SELECT * FROM saas_users WHERE email=? AND password_hash=?", (email, pw_hash)
).fetchone()
conn.close()
if not user:
return jsonify({"error": "Identifiants incorrects."}), 401
token = generate_token(user["id"])
return jsonify({"token": token, "user": user_to_dict(user)}), 200
@auth_bp.route("/me", methods=["GET"])
@require_auth
def me():
"""GET /api/v1/auth/me"""
return jsonify({"user": user_to_dict(request.current_user)}), 200
@auth_bp.route("/update-profile", methods=["POST"])
@require_auth
def update_profile():
"""POST /api/v1/auth/update-profile"""
data = request.get_json(silent=True) or {}
uid = request.current_user["id"]
fields = {}
if "firstname" in data:
fields["firstname"] = data["firstname"].strip()
if "lastname" in data:
fields["lastname"] = data["lastname"].strip()
if "email" in data:
email = data["email"].strip().lower()
if "@" not in email:
return jsonify({"error": "Email invalide."}), 400
fields["email"] = email
if not fields:
return jsonify({"ok": True}), 200
set_clause = ", ".join(f"{k}=?" for k in fields)
values = list(fields.values()) + [datetime.utcnow().isoformat(), uid]
conn = get_db()
try:
conn.execute(
f"UPDATE saas_users SET {set_clause}, updated_at=? WHERE id=?", values
)
conn.commit()
except sqlite3.IntegrityError:
conn.close()
return jsonify({"error": "Cet email est déjà utilisé."}), 409
conn.close()
return jsonify({"ok": True}), 200
@auth_bp.route("/change-password", methods=["POST"])
@require_auth
def change_password():
"""POST /api/v1/auth/change-password"""
data = request.get_json(silent=True) or {}
uid = request.current_user["id"]
cur_pwd = data.get("current_password") or ""
new_pwd = data.get("new_password") or ""
pwd_error = validate_password_strength(new_pwd)
if pwd_error:
return jsonify({"error": pwd_error}), 400
conn = get_db()
user = conn.execute(
"SELECT * FROM saas_users WHERE id=? AND password_hash=?",
(uid, hash_password(cur_pwd)),
).fetchone()
if not user:
conn.close()
return jsonify({"error": "Mot de passe actuel incorrect."}), 401
conn.execute(
"UPDATE saas_users SET password_hash=?, updated_at=? WHERE id=?",
(hash_password(new_pwd), datetime.utcnow().isoformat(), uid),
)
conn.commit()
conn.close()
return jsonify({"ok": True}), 200
@auth_bp.route("/update-plan", methods=["POST"])
@require_auth
def update_plan():
"""POST /api/v1/auth/update-plan"""
data = request.get_json(silent=True) or {}
plan = data.get("plan", "free")
if plan not in ("free", "premium", "pro"):
return jsonify({"error": "Plan invalide."}), 400
uid = request.current_user["id"]
conn = get_db()
conn.execute(
"UPDATE saas_users SET plan=?, updated_at=? WHERE id=?",
(plan, datetime.utcnow().isoformat(), uid),
)
conn.commit()
conn.close()
return jsonify({"ok": True, "plan": plan}), 200
@auth_bp.route("/update-preferences", methods=["POST"])
@require_auth
def update_preferences():
"""POST /api/v1/auth/update-preferences"""
data = request.get_json(silent=True) or {}
uid = request.current_user["id"]
fields = {}
if "telegram_chat_id" in data:
fields["telegram_chat_id"] = data["telegram_chat_id"] or None
if "alert_value_bets" in data:
fields["alert_value_bets"] = 1 if data["alert_value_bets"] else 0
if "alert_top1" in data:
fields["alert_top1"] = 1 if data["alert_top1"] else 0
if "alert_quinte_only" in data:
fields["alert_quinte_only"] = 1 if data["alert_quinte_only"] else 0
if fields:
set_clause = ", ".join(f"{k}=?" for k in fields)
values = list(fields.values()) + [datetime.utcnow().isoformat(), uid]
conn = get_db()
conn.execute(
f"UPDATE saas_users SET {set_clause}, updated_at=? WHERE id=?", values
)
conn.commit()
conn.close()
return jsonify({"ok": True}), 200
@auth_bp.route("/logout", methods=["POST"])
@require_auth
def logout():
"""POST /api/v1/auth/logout"""
auth = request.headers.get("Authorization", "")
token = auth.removeprefix("Bearer ").strip()
conn = get_db()
conn.execute("DELETE FROM saas_tokens WHERE token=?", (token,))
conn.commit()
conn.close()
return jsonify({"ok": True}), 200
@auth_bp.route("/delete-account", methods=["DELETE"])
@require_auth
def delete_account():
"""DELETE /api/v1/auth/delete-account"""
uid = request.current_user["id"]
conn = get_db()
conn.execute("DELETE FROM saas_tokens WHERE user_id=?", (uid,))
conn.execute("DELETE FROM saas_users WHERE id=?", (uid,))
conn.commit()
conn.close()
return jsonify({"ok": True}), 200