- Add WEAK_PASSWORDS set (50+ common passwords) in saas_auth.py - Add validate_password_strength() function: checks min length, blacklist, digits, letters - Replace raw len() checks in /register and /change-password with validate_password_strength() - Add TestWeakPasswordRejection class in test_security.py: parametrized weak pwd test, strong pwd 201 test, no-digit, no-letter tests Co-Authored-By: Paperclip <noreply@paperclip.ing>
462 lines
14 KiB
Python
462 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
SaaS Auth Blueprint — /api/v1/auth/*
|
|
Gestion des utilisateurs, JWT, plans, préférences.
|
|
Sprint 4-5 — HRT-30
|
|
"""
|
|
|
|
from flask import Blueprint, request, jsonify, current_app
|
|
import sqlite3
|
|
import hashlib
|
|
import secrets
|
|
import os
|
|
import time
|
|
import json
|
|
from functools import wraps
|
|
from datetime import datetime
|
|
|
|
# ─── Blacklist mots de passe faibles ─────────────────────────────────────────
|
|
# HRT-63 — Validation mots de passe faibles
|
|
WEAK_PASSWORDS = {
|
|
"password",
|
|
"password1",
|
|
"password123",
|
|
"passw0rd",
|
|
"12345678",
|
|
"123456789",
|
|
"1234567890",
|
|
"123456",
|
|
"12345",
|
|
"1234",
|
|
"qwerty",
|
|
"qwerty123",
|
|
"qwertyuiop",
|
|
"azerty",
|
|
"azertyuiop",
|
|
"letmein",
|
|
"letmein1",
|
|
"iloveyou",
|
|
"iloveyou1",
|
|
"admin",
|
|
"admin123",
|
|
"admin1234",
|
|
"administrator",
|
|
"welcome",
|
|
"welcome1",
|
|
"welcome123",
|
|
"monkey",
|
|
"monkey1",
|
|
"dragon",
|
|
"dragon1",
|
|
"master",
|
|
"master1",
|
|
"football",
|
|
"soccer",
|
|
"baseball",
|
|
"basketball",
|
|
"superman",
|
|
"batman",
|
|
"starwars",
|
|
"starwars1",
|
|
"princess",
|
|
"princess1",
|
|
"sunshine",
|
|
"sunshine1",
|
|
"shadow",
|
|
"shadow1",
|
|
"michael",
|
|
"michael1",
|
|
"jessica",
|
|
"jessica1",
|
|
"abc123",
|
|
"abc1234",
|
|
"abcd1234",
|
|
"abcdefgh",
|
|
"login",
|
|
"login123",
|
|
"pass",
|
|
"pass1234",
|
|
"test",
|
|
"test1234",
|
|
"test123456",
|
|
"hello",
|
|
"hello123",
|
|
"hello1234",
|
|
"changeme",
|
|
"changeme1",
|
|
"secret",
|
|
"secret1",
|
|
"secret123",
|
|
"trustno1",
|
|
"zaq1zaq1",
|
|
"qazwsx",
|
|
"qazwsxedc",
|
|
"111111",
|
|
"1111111",
|
|
"11111111",
|
|
"000000",
|
|
"00000000",
|
|
"123123",
|
|
"1231234",
|
|
"321321",
|
|
"p@ssword",
|
|
"p@ssw0rd",
|
|
"pa$$word",
|
|
"turf",
|
|
"turf123",
|
|
"cheval",
|
|
"cheval123",
|
|
"pmu",
|
|
"pmu123",
|
|
}
|
|
|
|
|
|
def validate_password_strength(password: str):
|
|
"""
|
|
Valide la complexité d'un mot de passe.
|
|
Retourne None si OK, sinon un message d'erreur (str).
|
|
Règles :
|
|
- 8 caractères minimum
|
|
- absent de la blacklist WEAK_PASSWORDS
|
|
- au moins 1 chiffre
|
|
- au moins 1 lettre
|
|
"""
|
|
if len(password) < 8:
|
|
return "Mot de passe trop court (8 caractères minimum)."
|
|
if password.lower() in WEAK_PASSWORDS:
|
|
return "Mot de passe trop commun. Choisissez un mot de passe plus sécurisé."
|
|
if not any(c.isdigit() for c in password):
|
|
return "Le mot de passe doit contenir au moins 1 chiffre."
|
|
if not any(c.isalpha() for c in password):
|
|
return "Le mot de passe doit contenir au moins 1 lettre."
|
|
return None
|
|
|
|
|
|
# ─── Config ───────────────────────────────────────────────────────────────────
|
|
DB_PATH = os.environ.get("TURF_SAAS_DB", "/home/h3r7/turf_saas/turf_saas.db")
|
|
JWT_SECRET = os.environ.get(
|
|
"JWT_SECRET", secrets.token_hex(32)
|
|
) # persist in env for prod
|
|
TOKEN_TTL = int(os.environ.get("JWT_TTL_SECONDS", 30 * 24 * 3600)) # 30 days
|
|
|
|
auth_bp = Blueprint("auth_v1", __name__, url_prefix="/api/v1/auth")
|
|
|
|
|
|
# ─── DB helpers ───────────────────────────────────────────────────────────────
|
|
def get_db():
|
|
conn = sqlite3.connect(DB_PATH)
|
|
conn.row_factory = sqlite3.Row
|
|
return conn
|
|
|
|
|
|
def init_users_table():
|
|
"""Ensure users table exists."""
|
|
conn = get_db()
|
|
conn.executescript("""
|
|
CREATE TABLE IF NOT EXISTS saas_users (
|
|
id TEXT PRIMARY KEY,
|
|
email TEXT UNIQUE NOT NULL,
|
|
firstname TEXT DEFAULT '',
|
|
lastname TEXT DEFAULT '',
|
|
password_hash TEXT NOT NULL,
|
|
plan TEXT DEFAULT 'free',
|
|
telegram_chat_id TEXT DEFAULT NULL,
|
|
alert_value_bets INTEGER DEFAULT 1,
|
|
alert_top1 INTEGER DEFAULT 1,
|
|
alert_quinte_only INTEGER DEFAULT 0,
|
|
created_at TEXT DEFAULT (datetime('now')),
|
|
updated_at TEXT DEFAULT (datetime('now'))
|
|
);
|
|
CREATE TABLE IF NOT EXISTS saas_tokens (
|
|
token TEXT PRIMARY KEY,
|
|
user_id TEXT NOT NULL,
|
|
expires_at INTEGER NOT NULL,
|
|
created_at TEXT DEFAULT (datetime('now'))
|
|
);
|
|
""")
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
try:
|
|
init_users_table()
|
|
except Exception as e:
|
|
print(f"[auth_bp] DB init warning: {e}")
|
|
|
|
|
|
# ─── Token helpers ────────────────────────────────────────────────────────────
|
|
def generate_token(user_id: str) -> str:
|
|
token = secrets.token_urlsafe(48)
|
|
expires = int(time.time()) + TOKEN_TTL
|
|
conn = get_db()
|
|
conn.execute(
|
|
"INSERT INTO saas_tokens (token, user_id, expires_at) VALUES (?,?,?)",
|
|
(token, user_id, expires),
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
return token
|
|
|
|
|
|
def validate_token(token: str):
|
|
"""Returns user row dict or None."""
|
|
if not token:
|
|
return None
|
|
conn = get_db()
|
|
now = int(time.time())
|
|
row = conn.execute(
|
|
"SELECT t.user_id, u.* FROM saas_tokens t JOIN saas_users u ON t.user_id=u.id "
|
|
"WHERE t.token=? AND t.expires_at>?",
|
|
(token, now),
|
|
).fetchone()
|
|
conn.close()
|
|
return dict(row) if row else None
|
|
|
|
|
|
def hash_password(password: str) -> str:
|
|
return hashlib.sha256(password.encode("utf-8")).hexdigest()
|
|
|
|
|
|
def require_auth(f):
|
|
@wraps(f)
|
|
def decorated(*args, **kwargs):
|
|
auth = request.headers.get("Authorization", "")
|
|
token = (
|
|
auth.removeprefix("Bearer ").strip() if auth.startswith("Bearer ") else None
|
|
)
|
|
user = validate_token(token)
|
|
if not user:
|
|
return jsonify({"error": "Non authentifié"}), 401
|
|
request.current_user = user
|
|
return f(*args, **kwargs)
|
|
|
|
return decorated
|
|
|
|
|
|
def user_to_dict(user) -> dict:
|
|
if isinstance(user, sqlite3.Row):
|
|
user = dict(user)
|
|
return {
|
|
"id": user.get("id"),
|
|
"email": user.get("email"),
|
|
"firstname": user.get("firstname", ""),
|
|
"lastname": user.get("lastname", ""),
|
|
"plan": user.get("plan", "free"),
|
|
"telegram_chat_id": user.get("telegram_chat_id"),
|
|
"alert_value_bets": bool(user.get("alert_value_bets", 1)),
|
|
"alert_top1": bool(user.get("alert_top1", 1)),
|
|
"alert_quinte_only": bool(user.get("alert_quinte_only", 0)),
|
|
"created_at": user.get("created_at"),
|
|
}
|
|
|
|
|
|
# ─── Routes ───────────────────────────────────────────────────────────────────
|
|
|
|
|
|
@auth_bp.route("/register", methods=["POST"])
|
|
def register():
|
|
"""POST /api/v1/auth/register"""
|
|
data = request.get_json(silent=True) or {}
|
|
email = (data.get("email") or "").strip().lower()
|
|
password = data.get("password") or ""
|
|
firstname = (data.get("firstname") or "").strip()
|
|
lastname = (data.get("lastname") or "").strip()
|
|
plan = data.get("plan", "free")
|
|
|
|
if not email or "@" not in email:
|
|
return jsonify({"error": "Adresse email invalide."}), 400
|
|
pwd_error = validate_password_strength(password)
|
|
if pwd_error:
|
|
return jsonify({"error": pwd_error}), 400
|
|
if plan not in ("free", "premium", "pro"):
|
|
plan = "free"
|
|
|
|
uid = secrets.token_hex(16)
|
|
pw_hash = hash_password(password)
|
|
conn = get_db()
|
|
try:
|
|
conn.execute(
|
|
"INSERT INTO saas_users (id, email, firstname, lastname, password_hash, plan) VALUES (?,?,?,?,?,?)",
|
|
(uid, email, firstname, lastname, pw_hash, plan),
|
|
)
|
|
conn.commit()
|
|
except sqlite3.IntegrityError:
|
|
conn.close()
|
|
return jsonify({"error": "Cette adresse email est déjà utilisée."}), 409
|
|
conn.close()
|
|
|
|
token = generate_token(uid)
|
|
user_row = validate_token(token)
|
|
return jsonify({"token": token, "user": user_to_dict(user_row)}), 201
|
|
|
|
|
|
@auth_bp.route("/login", methods=["POST"])
|
|
def login():
|
|
"""POST /api/v1/auth/login"""
|
|
data = request.get_json(silent=True) or {}
|
|
email = (data.get("email") or "").strip().lower()
|
|
password = data.get("password") or ""
|
|
|
|
if not email or not password:
|
|
return jsonify({"error": "Email et mot de passe requis."}), 400
|
|
|
|
pw_hash = hash_password(password)
|
|
conn = get_db()
|
|
user = conn.execute(
|
|
"SELECT * FROM saas_users WHERE email=? AND password_hash=?", (email, pw_hash)
|
|
).fetchone()
|
|
conn.close()
|
|
|
|
if not user:
|
|
return jsonify({"error": "Identifiants incorrects."}), 401
|
|
|
|
token = generate_token(user["id"])
|
|
return jsonify({"token": token, "user": user_to_dict(user)}), 200
|
|
|
|
|
|
@auth_bp.route("/me", methods=["GET"])
|
|
@require_auth
|
|
def me():
|
|
"""GET /api/v1/auth/me"""
|
|
return jsonify({"user": user_to_dict(request.current_user)}), 200
|
|
|
|
|
|
@auth_bp.route("/update-profile", methods=["POST"])
|
|
@require_auth
|
|
def update_profile():
|
|
"""POST /api/v1/auth/update-profile"""
|
|
data = request.get_json(silent=True) or {}
|
|
uid = request.current_user["id"]
|
|
fields = {}
|
|
if "firstname" in data:
|
|
fields["firstname"] = data["firstname"].strip()
|
|
if "lastname" in data:
|
|
fields["lastname"] = data["lastname"].strip()
|
|
if "email" in data:
|
|
email = data["email"].strip().lower()
|
|
if "@" not in email:
|
|
return jsonify({"error": "Email invalide."}), 400
|
|
fields["email"] = email
|
|
|
|
if not fields:
|
|
return jsonify({"ok": True}), 200
|
|
|
|
set_clause = ", ".join(f"{k}=?" for k in fields)
|
|
values = list(fields.values()) + [datetime.utcnow().isoformat(), uid]
|
|
conn = get_db()
|
|
try:
|
|
conn.execute(
|
|
f"UPDATE saas_users SET {set_clause}, updated_at=? WHERE id=?", values
|
|
)
|
|
conn.commit()
|
|
except sqlite3.IntegrityError:
|
|
conn.close()
|
|
return jsonify({"error": "Cet email est déjà utilisé."}), 409
|
|
conn.close()
|
|
return jsonify({"ok": True}), 200
|
|
|
|
|
|
@auth_bp.route("/change-password", methods=["POST"])
|
|
@require_auth
|
|
def change_password():
|
|
"""POST /api/v1/auth/change-password"""
|
|
data = request.get_json(silent=True) or {}
|
|
uid = request.current_user["id"]
|
|
cur_pwd = data.get("current_password") or ""
|
|
new_pwd = data.get("new_password") or ""
|
|
|
|
pwd_error = validate_password_strength(new_pwd)
|
|
if pwd_error:
|
|
return jsonify({"error": pwd_error}), 400
|
|
|
|
conn = get_db()
|
|
user = conn.execute(
|
|
"SELECT * FROM saas_users WHERE id=? AND password_hash=?",
|
|
(uid, hash_password(cur_pwd)),
|
|
).fetchone()
|
|
if not user:
|
|
conn.close()
|
|
return jsonify({"error": "Mot de passe actuel incorrect."}), 401
|
|
|
|
conn.execute(
|
|
"UPDATE saas_users SET password_hash=?, updated_at=? WHERE id=?",
|
|
(hash_password(new_pwd), datetime.utcnow().isoformat(), uid),
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
return jsonify({"ok": True}), 200
|
|
|
|
|
|
@auth_bp.route("/update-plan", methods=["POST"])
|
|
@require_auth
|
|
def update_plan():
|
|
"""POST /api/v1/auth/update-plan"""
|
|
data = request.get_json(silent=True) or {}
|
|
plan = data.get("plan", "free")
|
|
if plan not in ("free", "premium", "pro"):
|
|
return jsonify({"error": "Plan invalide."}), 400
|
|
uid = request.current_user["id"]
|
|
conn = get_db()
|
|
conn.execute(
|
|
"UPDATE saas_users SET plan=?, updated_at=? WHERE id=?",
|
|
(plan, datetime.utcnow().isoformat(), uid),
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
return jsonify({"ok": True, "plan": plan}), 200
|
|
|
|
|
|
@auth_bp.route("/update-preferences", methods=["POST"])
|
|
@require_auth
|
|
def update_preferences():
|
|
"""POST /api/v1/auth/update-preferences"""
|
|
data = request.get_json(silent=True) or {}
|
|
uid = request.current_user["id"]
|
|
fields = {}
|
|
if "telegram_chat_id" in data:
|
|
fields["telegram_chat_id"] = data["telegram_chat_id"] or None
|
|
if "alert_value_bets" in data:
|
|
fields["alert_value_bets"] = 1 if data["alert_value_bets"] else 0
|
|
if "alert_top1" in data:
|
|
fields["alert_top1"] = 1 if data["alert_top1"] else 0
|
|
if "alert_quinte_only" in data:
|
|
fields["alert_quinte_only"] = 1 if data["alert_quinte_only"] else 0
|
|
|
|
if fields:
|
|
set_clause = ", ".join(f"{k}=?" for k in fields)
|
|
values = list(fields.values()) + [datetime.utcnow().isoformat(), uid]
|
|
conn = get_db()
|
|
conn.execute(
|
|
f"UPDATE saas_users SET {set_clause}, updated_at=? WHERE id=?", values
|
|
)
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
return jsonify({"ok": True}), 200
|
|
|
|
|
|
@auth_bp.route("/logout", methods=["POST"])
|
|
@require_auth
|
|
def logout():
|
|
"""POST /api/v1/auth/logout"""
|
|
auth = request.headers.get("Authorization", "")
|
|
token = auth.removeprefix("Bearer ").strip()
|
|
conn = get_db()
|
|
conn.execute("DELETE FROM saas_tokens WHERE token=?", (token,))
|
|
conn.commit()
|
|
conn.close()
|
|
return jsonify({"ok": True}), 200
|
|
|
|
|
|
@auth_bp.route("/delete-account", methods=["DELETE"])
|
|
@require_auth
|
|
def delete_account():
|
|
"""DELETE /api/v1/auth/delete-account"""
|
|
uid = request.current_user["id"]
|
|
conn = get_db()
|
|
conn.execute("DELETE FROM saas_tokens WHERE user_id=?", (uid,))
|
|
conn.execute("DELETE FROM saas_users WHERE id=?", (uid,))
|
|
conn.commit()
|
|
conn.close()
|
|
return jsonify({"ok": True}), 200
|