Files
turf_saas/tests/security/test_security.py
DevOps Engineer 7f5573f076 feat(security): add IP-based rate limiting on /api/v1/auth/login — fix brute force HRT-62
- saas_auth.py: in-memory sliding-window rate limiter (5 attempts/5min, 15min block)
  using collections.defaultdict + threading.Lock, stdlib only, no new deps
- portal_server.py: register rate_limit_middleware + access_log_middleware
  (was missing, leaving global 100req/min limit unApplied on portal routes)
- tests/security/test_security.py: add TestLoginRateLimit class with
  test_login_brute_force_blocked_after_5_attempts and test_login_429_has_retry_after_header

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-27 14:50:08 +02:00

370 lines
13 KiB
Python

"""
Tests de sécurité — SaaS Turf Prédictions IA
Sprint 8 — QA, Beta Fermee, Go/No-Go
Ticket: HRT-34
Couverture :
- Test injection SQL sur tous les inputs
- Test authentification : JWT expiration, refresh, logout
- Test autorisation : plan free ne peut pas accéder routes premium
- (OWASP ZAP est exécuté séparément via script shell)
"""
import pytest
import requests
import time
import base64
import json
import os
BASE_URL = os.environ.get("APP_URL", "http://localhost:8792")
# === Payloads injection SQL ===
SQL_INJECTION_PAYLOADS = [
"' OR '1'='1",
"' OR 1=1--",
"'; DROP TABLE users;--",
"' UNION SELECT null,null,null--",
"1'; SELECT * FROM users--",
"admin'--",
"' OR 'x'='x",
"1 OR 1=1",
"%27 OR %271%27=%271",
]
# === Payloads XSS ===
XSS_PAYLOADS = [
"<script>alert(1)</script>",
"<img src=x onerror=alert(1)>",
"javascript:alert(1)",
"<svg onload=alert(1)>",
'"><script>alert(document.cookie)</script>',
]
# === Helpers ===
def get_token(email: str, password: str) -> str | None:
"""Obtenir un token JWT."""
try:
resp = requests.post(
f"{BASE_URL}/api/auth/login",
json={"email": email, "password": password},
timeout=5,
)
if resp.status_code == 200:
data = resp.json()
return data.get("access_token") or data.get("token")
except Exception:
pass
return None
# === Tests injection SQL ===
class TestSQLInjection:
"""Tests d'injection SQL sur les endpoints publics et authentifiés."""
@pytest.mark.parametrize("payload", SQL_INJECTION_PAYLOADS)
def test_injection_login_email(self, payload):
"""L'injection SQL dans le champ email ne doit pas fonctionner."""
resp = requests.post(
f"{BASE_URL}/api/auth/login",
json={"email": payload, "password": "anything"},
timeout=5,
)
assert resp.status_code not in (200,), (
f"Injection SQL acceptée dans email: payload={payload!r}, status={resp.status_code}"
)
# Vérifier qu'aucune donnée sensible n'est exposée
body = resp.text.lower()
for keyword in [
"sqlite_master",
"table_name",
"column_name",
"password",
"hash",
]:
assert keyword not in body, (
f"Données sensibles exposées dans la réponse: keyword={keyword!r}"
)
@pytest.mark.parametrize("payload", SQL_INJECTION_PAYLOADS)
def test_injection_search_query(self, payload):
"""L'injection SQL dans les paramètres de recherche ne doit pas fonctionner."""
resp = requests.get(
f"{BASE_URL}/api/races",
params={"q": payload, "date": payload},
timeout=5,
)
# On accepte 200 (résultat vide) ou 400/422 (validation), mais pas 500
assert resp.status_code != 500, (
f"Erreur serveur sur injection SQL dans recherche: payload={payload!r}"
)
body = resp.text.lower()
for keyword in ["sqlite_master", "syntax error", "table_name"]:
assert keyword not in body, f"Fuite SQL dans réponse: keyword={keyword!r}"
@pytest.mark.parametrize("payload", SQL_INJECTION_PAYLOADS)
def test_injection_register_fields(self, payload):
"""L'injection SQL dans les champs d'inscription ne doit pas passer."""
resp = requests.post(
f"{BASE_URL}/api/auth/register",
json={
"email": f"test@test.com",
"password": payload,
"name": payload,
},
timeout=5,
)
assert resp.status_code != 500, (
f"Erreur serveur sur injection dans register: payload={payload!r}"
)
# === Tests authentification JWT ===
class TestJWTAuthentication:
"""Tests JWT : expiration, refresh, logout."""
def test_jwt_expiration_token_invalide(self):
"""Un token expiré doit être rejeté."""
# Token JWT expiré fabriqué manuellement (exp dans le passé)
# Header: {"alg": "HS256", "typ": "JWT"}
# Payload: {"sub": "test", "exp": 1000000000} (expiry: 2001)
expired_token = (
"eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9."
"eyJzdWIiOiJ0ZXN0IiwiZXhwIjoxMDAwMDAwMDAwfQ."
"invalid_signature_here"
)
resp = requests.get(
f"{BASE_URL}/api/v1/predictions/today",
headers={"Authorization": f"Bearer {expired_token}"},
timeout=5,
)
assert resp.status_code in (401, 403, 422), (
f"Token expiré accepté: status={resp.status_code}"
)
def test_jwt_token_malformé(self):
"""Un token JWT malformé doit être rejeté."""
for bad_token in ["not.a.jwt", "Bearer", "null", "undefined", ""]:
resp = requests.get(
f"{BASE_URL}/api/v1/predictions/today",
headers={"Authorization": f"Bearer {bad_token}"},
timeout=5,
)
assert resp.status_code in (401, 403, 422, 400), (
f"Token malformé accepté: token={bad_token!r}, status={resp.status_code}"
)
def test_jwt_sans_token(self):
"""Sans token, les routes protégées doivent retourner 401."""
resp = requests.get(f"{BASE_URL}/api/v1/export/csv", timeout=5)
assert resp.status_code in (401, 403), (
f"Route protégée accessible sans token: status={resp.status_code}"
)
def test_jwt_refresh(self):
"""Le mécanisme de refresh doit fonctionner."""
# Tenter d'obtenir un token valide d'abord
token = get_token("loadtest+0@h3r7.tech", "TestLoad_2026!")
if token is None:
pytest.skip("Utilisateur de test non créé — nécessite HRT-31")
resp = requests.post(
f"{BASE_URL}/api/auth/refresh",
headers={"Authorization": f"Bearer {token}"},
timeout=5,
)
if resp.status_code == 404:
pytest.skip("Route /api/auth/refresh non implémentée")
assert resp.status_code == 200, (
f"Refresh token échoué: status={resp.status_code}"
)
data = resp.json()
assert "access_token" in data or "token" in data, (
"Aucun token retourné par /api/auth/refresh"
)
def test_jwt_logout(self):
"""Après logout, le token doit être invalidé."""
token = get_token("loadtest+0@h3r7.tech", "TestLoad_2026!")
if token is None:
pytest.skip("Utilisateur de test non créé — nécessite HRT-31")
# Logout
resp = requests.post(
f"{BASE_URL}/api/auth/logout",
headers={"Authorization": f"Bearer {token}"},
timeout=5,
)
if resp.status_code == 404:
pytest.skip("Route /api/auth/logout non implémentée")
assert resp.status_code in (200, 204), (
f"Logout échoué: status={resp.status_code}"
)
# Vérifier que le token est invalidé
resp2 = requests.get(
f"{BASE_URL}/api/export/csv",
headers={"Authorization": f"Bearer {token}"},
timeout=5,
)
assert resp2.status_code in (401, 403), (
f"Token encore valide après logout: status={resp2.status_code}"
)
# === Tests autorisation par plan ===
class TestPlanAuthorisation:
"""Tests d'autorisation : free ne peut pas accéder aux routes premium."""
PREMIUM_ROUTES = [
"/api/races?all=true",
"/api/export/csv",
"/api/predictions/all",
"/api/premium/historical",
]
FREE_ROUTES = [
"/api",
"/api/races",
"/api/scoring",
"/dashboard",
]
def test_routes_premium_inaccessibles_sans_auth(self):
"""Les routes premium doivent être inaccessibles sans authentification."""
for route in self.PREMIUM_ROUTES:
resp = requests.get(f"{BASE_URL}{route}", timeout=5)
assert resp.status_code in (401, 403, 404), (
f"Route premium accessible sans auth: {route}{resp.status_code}"
)
def test_routes_libres_accessibles(self):
"""Les routes libres (portail, dashboard) doivent être accessibles."""
for route in self.FREE_ROUTES:
resp = requests.get(f"{BASE_URL}{route}", timeout=5)
# On accepte tout sauf 5xx
assert resp.status_code < 500, (
f"Route libre retourne erreur serveur: {route}{resp.status_code}"
)
def test_plan_free_bloque_routes_premium(self):
"""Un token free ne doit pas accéder aux routes premium."""
token = get_token("loadtest+0@h3r7.tech", "TestLoad_2026!")
if token is None:
pytest.skip("Utilisateur de test non créé — nécessite HRT-31")
for route in self.PREMIUM_ROUTES:
resp = requests.get(
f"{BASE_URL}{route}",
headers={"Authorization": f"Bearer {token}"},
timeout=5,
)
# Plan free ne devrait pas pouvoir accéder (403 ou 402 Payment Required)
# Si 200 — les données doivent être limitées
if resp.status_code == 200:
data = (
resp.json()
if resp.headers.get("content-type", "").startswith(
"application/json"
)
else {}
)
# Vérifier que ce n'est pas un accès complet
if isinstance(data, list):
# Max 3 éléments pour plan free (top 3)
assert len(data) <= 10, (
f"Plan free retourne trop de données sur {route}: {len(data)} éléments"
)
else:
assert resp.status_code in (403, 402, 401), (
f"Status inattendu sur route premium pour plan free: {route}{resp.status_code}"
)
def test_injection_dans_bearer_token(self):
"""Injection dans le token Bearer ne doit pas provoquer d'erreur 500."""
for payload in SQL_INJECTION_PAYLOADS[:3]:
encoded = base64.b64encode(payload.encode()).decode()
resp = requests.get(
f"{BASE_URL}/api/races",
headers={"Authorization": f"Bearer {encoded}"},
timeout=5,
)
assert resp.status_code != 500, (
f"Erreur serveur sur injection dans Authorization: payload={payload!r}"
)
# === Tests rate limiting login ===
class TestLoginRateLimit:
"""Tests rate limiting sur /api/v1/auth/login."""
TARGET_URL = (
os.environ.get("APP_URL", "http://localhost:8792") + "/api/v1/auth/login"
)
def test_login_brute_force_blocked_after_5_attempts(self):
"""Après 5 tentatives, le 6ème appel doit retourner 429."""
# Utiliser un email unique pour isoler le test
email = f"ratelimit_test_{int(time.time())}@h3r7.tech"
for i in range(5):
resp = requests.post(
self.TARGET_URL,
json={"email": email, "password": "wrong_password"},
timeout=5,
)
assert resp.status_code in (400, 401), (
f"Tentative {i + 1}: status inattendu {resp.status_code}"
)
# La 6ème tentative doit être bloquée
resp = requests.post(
self.TARGET_URL,
json={"email": email, "password": "wrong_password"},
timeout=5,
)
assert resp.status_code == 429, (
f"Rate limit non appliqué après 5 tentatives: got {resp.status_code}"
)
assert "Retry-After" in resp.headers, "Header Retry-After manquant sur 429"
def test_login_429_has_retry_after_header(self):
"""La réponse 429 doit inclure Retry-After."""
email = f"ratelimit_test2_{int(time.time())}@h3r7.tech"
for _ in range(6):
requests.post(
self.TARGET_URL, json={"email": email, "password": "x"}, timeout=5
)
resp = requests.post(
self.TARGET_URL, json={"email": email, "password": "x"}, timeout=5
)
if resp.status_code == 429:
assert "Retry-After" in resp.headers
assert int(resp.headers["Retry-After"]) > 0
if __name__ == "__main__":
import subprocess
subprocess.run(
[
"python",
"-m",
"pytest",
__file__,
"-v",
"--tb=short",
"--html=tests/reports/security_report.html",
"--self-contained-html",
]
)