Merge HRT-63 + HRT-54: password blacklist/complexity + billing JWT fix — validated CTO

- HRT-63: WEAK_PASSWORDS blacklist (50+ entries) + validate_password_strength()
- HRT-54: billing JWT token fix, table name corrections

Co-Authored-By: Paperclip <noreply@paperclip.ing>
This commit is contained in:
CTO H3R7Tech
2026-04-27 15:22:03 +02:00
8 changed files with 276 additions and 33 deletions

View File

@@ -24,9 +24,9 @@ import os
from datetime import datetime, timedelta, timezone
import stripe
from flask import Blueprint, g, jsonify, request
from flask import Blueprint, jsonify, request
from auth import jwt_required_middleware
from saas_auth import require_auth as jwt_required_middleware
from billing_db import get_db, migrate_billing_tables
logger = logging.getLogger("turf_saas.billing")
@@ -73,18 +73,18 @@ def _sget(obj, key, default=None):
return default
def _get_active_subscription(db, user_id: int):
def _get_active_subscription(db, user_id):
"""Return the most recent active subscription row for a user."""
return db.execute(
"""SELECT * FROM subscriptions
"""SELECT * FROM saas_subscriptions
WHERE user_id = ?
ORDER BY start_date DESC
LIMIT 1""",
(user_id,),
(str(user_id),),
).fetchone()
def _upsert_subscription(db, user_id: int, **fields):
def _upsert_subscription(db, user_id, **fields):
"""
Update existing subscription or insert a new one.
fields: plan, stripe_customer_id, stripe_subscription_id,
@@ -95,19 +95,19 @@ def _upsert_subscription(db, user_id: int, **fields):
# Build SET clause dynamically from provided fields
set_parts = ", ".join(f"{k} = ?" for k in fields)
values = list(fields.values()) + [existing["id"]]
db.execute(f"UPDATE subscriptions SET {set_parts} WHERE id = ?", values)
db.execute(f"UPDATE saas_subscriptions SET {set_parts} WHERE id = ?", values)
else:
cols = ", ".join(["user_id"] + list(fields.keys()))
placeholders = ", ".join(["?"] * (1 + len(fields)))
values = [user_id] + list(fields.values())
values = [str(user_id)] + list(fields.values())
db.execute(
f"INSERT INTO subscriptions ({cols}) VALUES ({placeholders})", values
f"INSERT INTO saas_subscriptions ({cols}) VALUES ({placeholders})", values
)
def _update_user_plan(db, user_id: int, plan: str):
"""Sync users.plan field to match active subscription."""
db.execute("UPDATE users SET plan = ? WHERE id = ?", (plan, user_id))
def _update_user_plan(db, user_id, plan: str):
"""Sync saas_users.plan field to match active subscription."""
db.execute("UPDATE saas_users SET plan = ? WHERE id = ?", (plan, str(user_id)))
def _get_or_create_stripe_customer(user, db) -> str:
@@ -198,7 +198,7 @@ def create_checkout():
if not price_id:
return jsonify({"error": f"Prix Stripe non configuré pour le plan {plan}"}), 503
user = g.current_user
user = request.current_user
if user["plan"] == plan:
return jsonify({"error": f"Vous êtes déjà sur le plan {plan}"}), 400
@@ -263,7 +263,7 @@ def create_portal():
if not stripe.api_key:
return jsonify({"error": "Stripe non configuré"}), 503
user = g.current_user
user = request.current_user
db = get_db()
try:
sub = _get_active_subscription(db, user["id"])
@@ -309,7 +309,7 @@ def billing_status():
200:
description: Subscription status
"""
user = g.current_user
user = request.current_user
db = get_db()
try:
sub = _get_active_subscription(db, user["id"])
@@ -428,7 +428,7 @@ def stripe_webhook():
def _resolve_user_from_customer(db, customer_id: str):
"""Look up user_id via subscriptions.stripe_customer_id."""
row = db.execute(
"SELECT user_id FROM subscriptions WHERE stripe_customer_id = ? LIMIT 1",
"SELECT user_id FROM saas_subscriptions WHERE stripe_customer_id = ? LIMIT 1",
(customer_id,),
).fetchone()
if row:
@@ -465,7 +465,7 @@ def _handle_checkout_completed(db, event):
user_id = _sget(metadata, "user_id")
if user_id:
user_id = int(user_id)
user_id = str(user_id)
else:
user_id = _resolve_user_from_customer(db, customer_id)
@@ -531,7 +531,7 @@ def _handle_subscription_updated(db, event):
meta = _sget(sub_obj, "metadata") or {}
meta_uid = _sget(meta, "user_id")
if meta_uid:
user_id = int(meta_uid)
user_id = str(meta_uid)
if not user_id:
logger.error(
@@ -565,7 +565,7 @@ def _handle_subscription_deleted(db, event):
meta = _sget(sub_obj, "metadata") or {}
meta_uid = _sget(meta, "user_id")
if meta_uid:
user_id = int(meta_uid)
user_id = str(meta_uid)
if not user_id:
logger.error(

View File

@@ -76,14 +76,30 @@ def migrate_billing_tables():
id INTEGER PRIMARY KEY AUTOINCREMENT,
stripe_event_id TEXT NOT NULL UNIQUE,
event_type TEXT NOT NULL,
user_id INTEGER REFERENCES users(id),
user_id TEXT,
payload TEXT,
processed_at DATETIME NOT NULL DEFAULT (datetime('now'))
);
CREATE INDEX IF NOT EXISTS idx_billing_events_user ON billing_events(user_id);
CREATE INDEX IF NOT EXISTS idx_billing_events_type ON billing_events(event_type);
CREATE INDEX IF NOT EXISTS idx_subscriptions_stripe ON subscriptions(stripe_subscription_id);
CREATE TABLE IF NOT EXISTS saas_subscriptions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
user_id TEXT NOT NULL,
plan TEXT NOT NULL DEFAULT 'free',
start_date DATETIME DEFAULT (datetime('now')),
end_date DATETIME,
stripe_customer_id TEXT,
stripe_subscription_id TEXT,
status TEXT NOT NULL DEFAULT 'active',
grace_period_end DATETIME,
current_period_end DATETIME
);
CREATE INDEX IF NOT EXISTS idx_billing_events_user ON billing_events(user_id);
CREATE INDEX IF NOT EXISTS idx_billing_events_type ON billing_events(event_type);
CREATE INDEX IF NOT EXISTS idx_saas_subs_user ON saas_subscriptions(user_id);
CREATE INDEX IF NOT EXISTS idx_saas_subs_customer ON saas_subscriptions(stripe_customer_id);
CREATE INDEX IF NOT EXISTS idx_saas_subs_stripe ON saas_subscriptions(stripe_subscription_id);
CREATE INDEX IF NOT EXISTS idx_subscriptions_stripe ON subscriptions(stripe_subscription_id);
CREATE INDEX IF NOT EXISTS idx_subscriptions_customer ON subscriptions(stripe_customer_id);
""")

View File

@@ -15,7 +15,7 @@ import sqlite3
import re
import os
DB_PATH = "/home/h3r7/turf_scraper/turf.db"
DB_PATH = "/home/h3r7/turf_saas/turf_saas.db"
HEADERS = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36',
'Accept-Language': 'fr-FR,fr;q=0.9,en;q=0.8',

View File

@@ -38,7 +38,7 @@ from pathlib import Path
# ─────────────────────────────────────────────────────────
# CONFIG
# ─────────────────────────────────────────────────────────
DB_PATH = "/home/h3r7/turf_scraper/turf.db"
DB_PATH = "/home/h3r7/turf_saas/turf_saas.db"
OUTPUT_DIR = Path("/home/h3r7/turf_scraper")
API_BASE = "https://online.turfinfo.api.pmu.fr/rest/client/7"

View File

@@ -9,7 +9,7 @@ from flask import Blueprint, request, jsonify
import sqlite3
import os
from datetime import datetime
from .saas_auth import require_auth
from saas_auth import require_auth
DB_PATH = os.environ.get("TURF_SAAS_DB", "/home/h3r7/turf_saas/turf_saas.db")
@@ -255,3 +255,28 @@ def export_csv():
"Content-Disposition": f"attachment; filename=turf_ia_{date_param}.csv"
},
)
# ─── Billing Blueprint (Stripe) + JWT init — HRT-49 ─────────────────────────
# Registers /api/v1/billing/* routes via nested Blueprint (Flask 2.0+)
# Also initializes JWTManager on the Flask app (required for jwt_required_middleware)
try:
from flask_jwt_extended import JWTManager
from api_v1.routes.billing import billing_bp
# Initialize JWTManager on the Flask app when api_v1_bp is registered
@api_v1_bp.record_once
def _init_jwt(state):
app = state.app
if not app.config.get('JWT_SECRET_KEY'):
import os
app.config['JWT_SECRET_KEY'] = os.environ.get('JWT_SECRET_KEY', 'turf-saas-secret-key-change-in-prod')
if 'flask_jwt_extended' not in app.extensions:
JWTManager(app)
# Register billing blueprint with url_prefix='/billing'
# (parent api_v1_bp has '/api/v1', so result is /api/v1/billing/*)
api_v1_bp.register_blueprint(billing_bp, url_prefix='/billing')
print('[saas_api_v1] Billing blueprint (Stripe) + JWT registered ✅')
except Exception as _billing_err:
print(f'[saas_api_v1] Warning: billing blueprint not loaded: {_billing_err}')

View File

@@ -15,6 +15,123 @@ import json
from functools import wraps
from datetime import datetime
# ─── Blacklist mots de passe faibles ─────────────────────────────────────────
# HRT-63 — Validation mots de passe faibles
WEAK_PASSWORDS = {
"password",
"password1",
"password123",
"passw0rd",
"12345678",
"123456789",
"1234567890",
"123456",
"12345",
"1234",
"qwerty",
"qwerty123",
"qwertyuiop",
"azerty",
"azertyuiop",
"letmein",
"letmein1",
"iloveyou",
"iloveyou1",
"admin",
"admin123",
"admin1234",
"administrator",
"welcome",
"welcome1",
"welcome123",
"monkey",
"monkey1",
"dragon",
"dragon1",
"master",
"master1",
"football",
"soccer",
"baseball",
"basketball",
"superman",
"batman",
"starwars",
"starwars1",
"princess",
"princess1",
"sunshine",
"sunshine1",
"shadow",
"shadow1",
"michael",
"michael1",
"jessica",
"jessica1",
"abc123",
"abc1234",
"abcd1234",
"abcdefgh",
"login",
"login123",
"pass",
"pass1234",
"test",
"test1234",
"test123456",
"hello",
"hello123",
"hello1234",
"changeme",
"changeme1",
"secret",
"secret1",
"secret123",
"trustno1",
"zaq1zaq1",
"qazwsx",
"qazwsxedc",
"111111",
"1111111",
"11111111",
"000000",
"00000000",
"123123",
"1231234",
"321321",
"p@ssword",
"p@ssw0rd",
"pa$$word",
"turf",
"turf123",
"cheval",
"cheval123",
"pmu",
"pmu123",
}
def validate_password_strength(password: str):
"""
Valide la complexité d'un mot de passe.
Retourne None si OK, sinon un message d'erreur (str).
Règles :
- 8 caractères minimum
- absent de la blacklist WEAK_PASSWORDS
- au moins 1 chiffre
- au moins 1 lettre
"""
if len(password) < 8:
return "Mot de passe trop court (8 caractères minimum)."
if password.lower() in WEAK_PASSWORDS:
return "Mot de passe trop commun. Choisissez un mot de passe plus sécurisé."
if not any(c.isdigit() for c in password):
return "Le mot de passe doit contenir au moins 1 chiffre."
if not any(c.isalpha() for c in password):
return "Le mot de passe doit contenir au moins 1 lettre."
return None
# ─── Config ───────────────────────────────────────────────────────────────────
DB_PATH = os.environ.get("TURF_SAAS_DB", "/home/h3r7/turf_saas/turf_saas.db")
JWT_SECRET = os.environ.get(
@@ -148,10 +265,9 @@ def register():
if not email or "@" not in email:
return jsonify({"error": "Adresse email invalide."}), 400
if len(password) < 8:
return jsonify(
{"error": "Mot de passe trop court (8 caractères minimum)."}
), 400
pwd_error = validate_password_strength(password)
if pwd_error:
return jsonify({"error": pwd_error}), 400
if plan not in ("free", "premium", "pro"):
plan = "free"
@@ -249,8 +365,9 @@ def change_password():
cur_pwd = data.get("current_password") or ""
new_pwd = data.get("new_password") or ""
if len(new_pwd) < 8:
return jsonify({"error": "Nouveau mot de passe trop court."}), 400
pwd_error = validate_password_strength(new_pwd)
if pwd_error:
return jsonify({"error": pwd_error}), 400
conn = get_db()
user = conn.execute(

View File

@@ -10,7 +10,7 @@ import json
import re
from datetime import datetime
DB_PATH = "/home/h3r7/turf_scraper/turf.db"
DB_PATH = "/home/h3r7/turf_saas/turf_saas.db"
HEADERS = {'User-Agent': 'Mozilla/5.0', 'Accept': 'application/json'}
def get_cote_from_db(horse_name, date_course):

View File

@@ -303,6 +303,91 @@ class TestPlanAuthorisation:
)
# === Tests validation mots de passe faibles (HRT-63) ===
class TestWeakPasswordRejection:
"""Tests rejet mots de passe faibles : blacklist + complexité (HRT-63)."""
REGISTER_URL = (
os.environ.get("APP_URL", "http://localhost:8792") + "/api/v1/auth/register"
)
WEAK_PASSWORDS = [
"password",
"12345678",
"qwerty123",
"letmein1",
"admin123",
"welcome1",
"iloveyou",
"abc12345",
"sunshine",
"111111111",
]
@pytest.mark.parametrize("weak_pwd", WEAK_PASSWORDS)
def test_weak_password_rejected(self, weak_pwd):
"""Les mots de passe faibles/blacklistés doivent retourner 400."""
import time as _time
unique_email = f"test_weak_{int(_time.time() * 1000)}_{weak_pwd[:4]}@h3r7.tech"
resp = requests.post(
self.REGISTER_URL,
json={"email": unique_email, "password": weak_pwd, "plan": "free"},
timeout=5,
)
assert resp.status_code == 400, (
f"Mot de passe faible accepté: pwd={weak_pwd!r}, status={resp.status_code}"
)
body = resp.json()
assert "error" in body, f"Pas de champ 'error' dans la réponse: {body}"
def test_strong_password_accepted(self):
"""Un mot de passe fort doit permettre l'inscription (retourne 201)."""
import time as _time
unique_email = f"test_strong_{int(_time.time() * 1000)}@h3r7.tech"
resp = requests.post(
self.REGISTER_URL,
json={"email": unique_email, "password": "Tr0ub4d@ur!", "plan": "free"},
timeout=5,
)
assert resp.status_code == 201, (
f"Mot de passe fort rejeté: status={resp.status_code}, body={resp.text}"
)
data = resp.json()
assert "token" in data, f"Pas de token dans la réponse: {data}"
def test_no_digit_rejected(self):
"""Un mot de passe sans chiffre doit être rejeté."""
import time as _time
unique_email = f"test_nodigit_{int(_time.time() * 1000)}@h3r7.tech"
resp = requests.post(
self.REGISTER_URL,
json={"email": unique_email, "password": "NoDigitPassword", "plan": "free"},
timeout=5,
)
assert resp.status_code == 400, (
f"Mot de passe sans chiffre accepté: status={resp.status_code}"
)
def test_no_letter_rejected(self):
"""Un mot de passe sans lettre doit être rejeté."""
import time as _time
unique_email = f"test_noletter_{int(_time.time() * 1000)}@h3r7.tech"
resp = requests.post(
self.REGISTER_URL,
json={"email": unique_email, "password": "12345678901", "plan": "free"},
timeout=5,
)
assert resp.status_code == 400, (
f"Mot de passe sans lettre accepté: status={resp.status_code}"
)
if __name__ == "__main__":
import subprocess