Frontend pages: - landing.html: marketing page — hero, pricing (Free/9.90e/24.90e), features, FAQ, footer, mobile-first responsive, LCP < 2.5s friendly - login.html: JWT auth login with JS validation, error handling, redirect-after-login - register.html: registration with plan selection preview sidebar, password strength meter - dashboard_saas.html: role-based dashboard (Free/Premium/Pro) with locked sections, race prediction cards, detailed table, stats row - onboarding.html: 3-step wizard — plan confirm + Telegram alerts config + first prediction preview - account.html: tabbed account management — profile, security (change-password, delete), plan upgrade, notification preferences Backend: - saas_auth.py: Flask Blueprint /api/v1/auth/* — register, login, token auth, profile/password/plan/preferences update, logout, delete-account - saas_api_v1.py: Flask Blueprint /api/v1/* — stats/summary, predictions/today (plan-gated), value-bets (Premium+), CSV export (Pro) Server: - portal_server.py: register blueprints, serve all new SaaS routes at /login /register /dashboard /onboarding /account Co-Authored-By: Paperclip <noreply@paperclip.ing>
290 lines
9.7 KiB
Python
290 lines
9.7 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
SaaS Auth Blueprint — /api/v1/auth/*
|
|
Gestion des utilisateurs, JWT, plans, préférences.
|
|
Sprint 4-5 — HRT-30
|
|
"""
|
|
|
|
from flask import Blueprint, request, jsonify
|
|
import sqlite3
|
|
import hashlib
|
|
import secrets
|
|
import os
|
|
import time
|
|
from functools import wraps
|
|
from datetime import datetime
|
|
|
|
DB_PATH = os.environ.get("TURF_SAAS_DB", "/home/h3r7/turf_saas/turf_saas.db")
|
|
JWT_SECRET = os.environ.get("JWT_SECRET", secrets.token_hex(32))
|
|
TOKEN_TTL = int(os.environ.get("JWT_TTL_SECONDS", 30 * 24 * 3600)) # 30 days
|
|
|
|
auth_bp = Blueprint("auth_v1", __name__, url_prefix="/api/v1/auth")
|
|
|
|
|
|
def get_db():
|
|
conn = sqlite3.connect(DB_PATH)
|
|
conn.row_factory = sqlite3.Row
|
|
return conn
|
|
|
|
|
|
def init_users_table():
|
|
conn = get_db()
|
|
conn.executescript("""
|
|
CREATE TABLE IF NOT EXISTS saas_users (
|
|
id TEXT PRIMARY KEY,
|
|
email TEXT UNIQUE NOT NULL,
|
|
firstname TEXT DEFAULT '',
|
|
lastname TEXT DEFAULT '',
|
|
password_hash TEXT NOT NULL,
|
|
plan TEXT DEFAULT 'free',
|
|
telegram_chat_id TEXT DEFAULT NULL,
|
|
alert_value_bets INTEGER DEFAULT 1,
|
|
alert_top1 INTEGER DEFAULT 1,
|
|
alert_quinte_only INTEGER DEFAULT 0,
|
|
created_at TEXT DEFAULT (datetime('now')),
|
|
updated_at TEXT DEFAULT (datetime('now'))
|
|
);
|
|
CREATE TABLE IF NOT EXISTS saas_tokens (
|
|
token TEXT PRIMARY KEY,
|
|
user_id TEXT NOT NULL,
|
|
expires_at INTEGER NOT NULL,
|
|
created_at TEXT DEFAULT (datetime('now'))
|
|
);
|
|
""")
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
|
|
try:
|
|
init_users_table()
|
|
except Exception as e:
|
|
print(f"[auth_bp] DB init warning: {e}")
|
|
|
|
|
|
def generate_token(user_id):
|
|
token = secrets.token_urlsafe(48)
|
|
expires = int(time.time()) + TOKEN_TTL
|
|
conn = get_db()
|
|
conn.execute("INSERT INTO saas_tokens (token, user_id, expires_at) VALUES (?,?,?)", (token, user_id, expires))
|
|
conn.commit()
|
|
conn.close()
|
|
return token
|
|
|
|
|
|
def validate_token(token):
|
|
if not token:
|
|
return None
|
|
conn = get_db()
|
|
now = int(time.time())
|
|
row = conn.execute(
|
|
"SELECT t.user_id, u.* FROM saas_tokens t JOIN saas_users u ON t.user_id=u.id "
|
|
"WHERE t.token=? AND t.expires_at>?",
|
|
(token, now)
|
|
).fetchone()
|
|
conn.close()
|
|
return dict(row) if row else None
|
|
|
|
|
|
def hash_password(password):
|
|
return hashlib.sha256(password.encode("utf-8")).hexdigest()
|
|
|
|
|
|
def require_auth(f):
|
|
@wraps(f)
|
|
def decorated(*args, **kwargs):
|
|
auth = request.headers.get("Authorization", "")
|
|
token = auth[7:].strip() if auth.startswith("Bearer ") else None
|
|
user = validate_token(token)
|
|
if not user:
|
|
return jsonify({"error": "Non authentifié"}), 401
|
|
request.current_user = user
|
|
return f(*args, **kwargs)
|
|
return decorated
|
|
|
|
|
|
def user_to_dict(user):
|
|
if isinstance(user, sqlite3.Row):
|
|
user = dict(user)
|
|
return {
|
|
"id": user.get("id"),
|
|
"email": user.get("email"),
|
|
"firstname": user.get("firstname", ""),
|
|
"lastname": user.get("lastname", ""),
|
|
"plan": user.get("plan", "free"),
|
|
"telegram_chat_id": user.get("telegram_chat_id"),
|
|
"alert_value_bets": bool(user.get("alert_value_bets", 1)),
|
|
"alert_top1": bool(user.get("alert_top1", 1)),
|
|
"alert_quinte_only": bool(user.get("alert_quinte_only", 0)),
|
|
"created_at": user.get("created_at"),
|
|
}
|
|
|
|
|
|
@auth_bp.route("/register", methods=["POST"])
|
|
def register():
|
|
data = request.get_json(silent=True) or {}
|
|
email = (data.get("email") or "").strip().lower()
|
|
password = data.get("password") or ""
|
|
firstname = (data.get("firstname") or "").strip()
|
|
lastname = (data.get("lastname") or "").strip()
|
|
plan = data.get("plan", "free")
|
|
|
|
if not email or "@" not in email:
|
|
return jsonify({"error": "Adresse email invalide."}), 400
|
|
if len(password) < 8:
|
|
return jsonify({"error": "Mot de passe trop court (8 caractères minimum)."}), 400
|
|
if plan not in ("free", "premium", "pro"):
|
|
plan = "free"
|
|
|
|
uid = secrets.token_hex(16)
|
|
pw_hash = hash_password(password)
|
|
conn = get_db()
|
|
try:
|
|
conn.execute(
|
|
"INSERT INTO saas_users (id, email, firstname, lastname, password_hash, plan) VALUES (?,?,?,?,?,?)",
|
|
(uid, email, firstname, lastname, pw_hash, plan)
|
|
)
|
|
conn.commit()
|
|
except sqlite3.IntegrityError:
|
|
conn.close()
|
|
return jsonify({"error": "Cette adresse email est déjà utilisée."}), 409
|
|
conn.close()
|
|
token = generate_token(uid)
|
|
user_row = validate_token(token)
|
|
return jsonify({"token": token, "user": user_to_dict(user_row)}), 201
|
|
|
|
|
|
@auth_bp.route("/login", methods=["POST"])
|
|
def login():
|
|
data = request.get_json(silent=True) or {}
|
|
email = (data.get("email") or "").strip().lower()
|
|
password = data.get("password") or ""
|
|
if not email or not password:
|
|
return jsonify({"error": "Email et mot de passe requis."}), 400
|
|
pw_hash = hash_password(password)
|
|
conn = get_db()
|
|
user = conn.execute(
|
|
"SELECT * FROM saas_users WHERE email=? AND password_hash=?",
|
|
(email, pw_hash)
|
|
).fetchone()
|
|
conn.close()
|
|
if not user:
|
|
return jsonify({"error": "Identifiants incorrects."}), 401
|
|
token = generate_token(user["id"])
|
|
return jsonify({"token": token, "user": user_to_dict(user)}), 200
|
|
|
|
|
|
@auth_bp.route("/me", methods=["GET"])
|
|
@require_auth
|
|
def me():
|
|
return jsonify({"user": user_to_dict(request.current_user)}), 200
|
|
|
|
|
|
@auth_bp.route("/update-profile", methods=["POST"])
|
|
@require_auth
|
|
def update_profile():
|
|
data = request.get_json(silent=True) or {}
|
|
uid = request.current_user["id"]
|
|
fields = {}
|
|
if "firstname" in data: fields["firstname"] = data["firstname"].strip()
|
|
if "lastname" in data: fields["lastname"] = data["lastname"].strip()
|
|
if "email" in data:
|
|
email = data["email"].strip().lower()
|
|
if "@" not in email:
|
|
return jsonify({"error": "Email invalide."}), 400
|
|
fields["email"] = email
|
|
if not fields:
|
|
return jsonify({"ok": True}), 200
|
|
set_clause = ", ".join(f"{k}=?" for k in fields)
|
|
values = list(fields.values()) + [datetime.utcnow().isoformat(), uid]
|
|
conn = get_db()
|
|
try:
|
|
conn.execute(f"UPDATE saas_users SET {set_clause}, updated_at=? WHERE id=?", values)
|
|
conn.commit()
|
|
except sqlite3.IntegrityError:
|
|
conn.close()
|
|
return jsonify({"error": "Cet email est déjà utilisé."}), 409
|
|
conn.close()
|
|
return jsonify({"ok": True}), 200
|
|
|
|
|
|
@auth_bp.route("/change-password", methods=["POST"])
|
|
@require_auth
|
|
def change_password():
|
|
data = request.get_json(silent=True) or {}
|
|
uid = request.current_user["id"]
|
|
cur_pwd = data.get("current_password") or ""
|
|
new_pwd = data.get("new_password") or ""
|
|
if len(new_pwd) < 8:
|
|
return jsonify({"error": "Nouveau mot de passe trop court."}), 400
|
|
conn = get_db()
|
|
user = conn.execute("SELECT * FROM saas_users WHERE id=? AND password_hash=?",
|
|
(uid, hash_password(cur_pwd))).fetchone()
|
|
if not user:
|
|
conn.close()
|
|
return jsonify({"error": "Mot de passe actuel incorrect."}), 401
|
|
conn.execute("UPDATE saas_users SET password_hash=?, updated_at=? WHERE id=?",
|
|
(hash_password(new_pwd), datetime.utcnow().isoformat(), uid))
|
|
conn.commit()
|
|
conn.close()
|
|
return jsonify({"ok": True}), 200
|
|
|
|
|
|
@auth_bp.route("/update-plan", methods=["POST"])
|
|
@require_auth
|
|
def update_plan():
|
|
data = request.get_json(silent=True) or {}
|
|
plan = data.get("plan", "free")
|
|
if plan not in ("free", "premium", "pro"):
|
|
return jsonify({"error": "Plan invalide."}), 400
|
|
uid = request.current_user["id"]
|
|
conn = get_db()
|
|
conn.execute("UPDATE saas_users SET plan=?, updated_at=? WHERE id=?",
|
|
(plan, datetime.utcnow().isoformat(), uid))
|
|
conn.commit()
|
|
conn.close()
|
|
return jsonify({"ok": True, "plan": plan}), 200
|
|
|
|
|
|
@auth_bp.route("/update-preferences", methods=["POST"])
|
|
@require_auth
|
|
def update_preferences():
|
|
data = request.get_json(silent=True) or {}
|
|
uid = request.current_user["id"]
|
|
fields = {}
|
|
if "telegram_chat_id" in data: fields["telegram_chat_id"] = data["telegram_chat_id"] or None
|
|
if "alert_value_bets" in data: fields["alert_value_bets"] = 1 if data["alert_value_bets"] else 0
|
|
if "alert_top1" in data: fields["alert_top1"] = 1 if data["alert_top1"] else 0
|
|
if "alert_quinte_only" in data: fields["alert_quinte_only"] = 1 if data["alert_quinte_only"] else 0
|
|
if fields:
|
|
set_clause = ", ".join(f"{k}=?" for k in fields)
|
|
values = list(fields.values()) + [datetime.utcnow().isoformat(), uid]
|
|
conn = get_db()
|
|
conn.execute(f"UPDATE saas_users SET {set_clause}, updated_at=? WHERE id=?", values)
|
|
conn.commit()
|
|
conn.close()
|
|
return jsonify({"ok": True}), 200
|
|
|
|
|
|
@auth_bp.route("/logout", methods=["POST"])
|
|
@require_auth
|
|
def logout():
|
|
auth = request.headers.get("Authorization", "")
|
|
token = auth[7:].strip() if auth.startswith("Bearer ") else ""
|
|
conn = get_db()
|
|
conn.execute("DELETE FROM saas_tokens WHERE token=?", (token,))
|
|
conn.commit()
|
|
conn.close()
|
|
return jsonify({"ok": True}), 200
|
|
|
|
|
|
@auth_bp.route("/delete-account", methods=["DELETE"])
|
|
@require_auth
|
|
def delete_account():
|
|
uid = request.current_user["id"]
|
|
conn = get_db()
|
|
conn.execute("DELETE FROM saas_tokens WHERE user_id=?", (uid,))
|
|
conn.execute("DELETE FROM saas_users WHERE id=?", (uid,))
|
|
conn.commit()
|
|
conn.close()
|
|
return jsonify({"ok": True}), 200
|