""" Tests de sécurité — SaaS Turf Prédictions IA Sprint 8 — QA, Beta Fermee, Go/No-Go Ticket: HRT-34 Couverture : - Test injection SQL sur tous les inputs - Test authentification : JWT expiration, refresh, logout - Test autorisation : plan free ne peut pas accéder routes premium - (OWASP ZAP est exécuté séparément via script shell) """ import pytest import requests import time import base64 import json import os BASE_URL = os.environ.get("APP_URL", "http://localhost:8792") # === Payloads injection SQL === SQL_INJECTION_PAYLOADS = [ "' OR '1'='1", "' OR 1=1--", "'; DROP TABLE users;--", "' UNION SELECT null,null,null--", "1'; SELECT * FROM users--", "admin'--", "' OR 'x'='x", "1 OR 1=1", "%27 OR %271%27=%271", ] # === Payloads XSS === XSS_PAYLOADS = [ "", "", "javascript:alert(1)", "", '">', ] # === Helpers === def get_token(email: str, password: str) -> str | None: """Obtenir un token JWT.""" try: resp = requests.post( f"{BASE_URL}/api/auth/login", json={"email": email, "password": password}, timeout=5, ) if resp.status_code == 200: data = resp.json() return data.get("access_token") or data.get("token") except Exception: pass return None # === Tests injection SQL === class TestSQLInjection: """Tests d'injection SQL sur les endpoints publics et authentifiés.""" @pytest.mark.parametrize("payload", SQL_INJECTION_PAYLOADS) def test_injection_login_email(self, payload): """L'injection SQL dans le champ email ne doit pas fonctionner.""" resp = requests.post( f"{BASE_URL}/api/auth/login", json={"email": payload, "password": "anything"}, timeout=5, ) assert resp.status_code not in (200,), ( f"Injection SQL acceptée dans email: payload={payload!r}, status={resp.status_code}" ) # Vérifier qu'aucune donnée sensible n'est exposée body = resp.text.lower() for keyword in [ "sqlite_master", "table_name", "column_name", "password", "hash", ]: assert keyword not in body, ( f"Données sensibles exposées dans la réponse: keyword={keyword!r}" ) @pytest.mark.parametrize("payload", SQL_INJECTION_PAYLOADS) def test_injection_search_query(self, payload): """L'injection SQL dans les paramètres de recherche ne doit pas fonctionner.""" resp = requests.get( f"{BASE_URL}/api/races", params={"q": payload, "date": payload}, timeout=5, ) # On accepte 200 (résultat vide) ou 400/422 (validation), mais pas 500 assert resp.status_code != 500, ( f"Erreur serveur sur injection SQL dans recherche: payload={payload!r}" ) body = resp.text.lower() for keyword in ["sqlite_master", "syntax error", "table_name"]: assert keyword not in body, f"Fuite SQL dans réponse: keyword={keyword!r}" @pytest.mark.parametrize("payload", SQL_INJECTION_PAYLOADS) def test_injection_register_fields(self, payload): """L'injection SQL dans les champs d'inscription ne doit pas passer.""" resp = requests.post( f"{BASE_URL}/api/auth/register", json={ "email": f"test@test.com", "password": payload, "name": payload, }, timeout=5, ) assert resp.status_code != 500, ( f"Erreur serveur sur injection dans register: payload={payload!r}" ) # === Tests authentification JWT === class TestJWTAuthentication: """Tests JWT : expiration, refresh, logout.""" def test_jwt_expiration_token_invalide(self): """Un token expiré doit être rejeté.""" # Token JWT expiré fabriqué manuellement (exp dans le passé) # Header: {"alg": "HS256", "typ": "JWT"} # Payload: {"sub": "test", "exp": 1000000000} (expiry: 2001) expired_token = ( "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9." "eyJzdWIiOiJ0ZXN0IiwiZXhwIjoxMDAwMDAwMDAwfQ." "invalid_signature_here" ) resp = requests.get( f"{BASE_URL}/api/v1/predictions/today", headers={"Authorization": f"Bearer {expired_token}"}, timeout=5, ) assert resp.status_code in (401, 403, 422), ( f"Token expiré accepté: status={resp.status_code}" ) def test_jwt_token_malformé(self): """Un token JWT malformé doit être rejeté.""" for bad_token in ["not.a.jwt", "Bearer", "null", "undefined", ""]: resp = requests.get( f"{BASE_URL}/api/v1/predictions/today", headers={"Authorization": f"Bearer {bad_token}"}, timeout=5, ) assert resp.status_code in (401, 403, 422, 400), ( f"Token malformé accepté: token={bad_token!r}, status={resp.status_code}" ) def test_jwt_sans_token(self): """Sans token, les routes protégées doivent retourner 401.""" resp = requests.get(f"{BASE_URL}/api/v1/export/csv", timeout=5) assert resp.status_code in (401, 403), ( f"Route protégée accessible sans token: status={resp.status_code}" ) def test_jwt_refresh(self): """Le mécanisme de refresh doit fonctionner.""" # Tenter d'obtenir un token valide d'abord token = get_token("loadtest+0@h3r7.tech", "TestLoad_2026!") if token is None: pytest.skip("Utilisateur de test non créé — nécessite HRT-31") resp = requests.post( f"{BASE_URL}/api/auth/refresh", headers={"Authorization": f"Bearer {token}"}, timeout=5, ) if resp.status_code == 404: pytest.skip("Route /api/auth/refresh non implémentée") assert resp.status_code == 200, ( f"Refresh token échoué: status={resp.status_code}" ) data = resp.json() assert "access_token" in data or "token" in data, ( "Aucun token retourné par /api/auth/refresh" ) def test_jwt_logout(self): """Après logout, le token doit être invalidé.""" token = get_token("loadtest+0@h3r7.tech", "TestLoad_2026!") if token is None: pytest.skip("Utilisateur de test non créé — nécessite HRT-31") # Logout resp = requests.post( f"{BASE_URL}/api/auth/logout", headers={"Authorization": f"Bearer {token}"}, timeout=5, ) if resp.status_code == 404: pytest.skip("Route /api/auth/logout non implémentée") assert resp.status_code in (200, 204), ( f"Logout échoué: status={resp.status_code}" ) # Vérifier que le token est invalidé resp2 = requests.get( f"{BASE_URL}/api/export/csv", headers={"Authorization": f"Bearer {token}"}, timeout=5, ) assert resp2.status_code in (401, 403), ( f"Token encore valide après logout: status={resp2.status_code}" ) # === Tests autorisation par plan === class TestPlanAuthorisation: """Tests d'autorisation : free ne peut pas accéder aux routes premium.""" PREMIUM_ROUTES = [ "/api/races?all=true", "/api/export/csv", "/api/predictions/all", "/api/premium/historical", ] FREE_ROUTES = [ "/api", "/api/races", "/api/scoring", "/dashboard", ] def test_routes_premium_inaccessibles_sans_auth(self): """Les routes premium doivent être inaccessibles sans authentification.""" for route in self.PREMIUM_ROUTES: resp = requests.get(f"{BASE_URL}{route}", timeout=5) assert resp.status_code in (401, 403, 404), ( f"Route premium accessible sans auth: {route} → {resp.status_code}" ) def test_routes_libres_accessibles(self): """Les routes libres (portail, dashboard) doivent être accessibles.""" for route in self.FREE_ROUTES: resp = requests.get(f"{BASE_URL}{route}", timeout=5) # On accepte tout sauf 5xx assert resp.status_code < 500, ( f"Route libre retourne erreur serveur: {route} → {resp.status_code}" ) def test_plan_free_bloque_routes_premium(self): """Un token free ne doit pas accéder aux routes premium.""" token = get_token("loadtest+0@h3r7.tech", "TestLoad_2026!") if token is None: pytest.skip("Utilisateur de test non créé — nécessite HRT-31") for route in self.PREMIUM_ROUTES: resp = requests.get( f"{BASE_URL}{route}", headers={"Authorization": f"Bearer {token}"}, timeout=5, ) # Plan free ne devrait pas pouvoir accéder (403 ou 402 Payment Required) # Si 200 — les données doivent être limitées if resp.status_code == 200: data = ( resp.json() if resp.headers.get("content-type", "").startswith( "application/json" ) else {} ) # Vérifier que ce n'est pas un accès complet if isinstance(data, list): # Max 3 éléments pour plan free (top 3) assert len(data) <= 10, ( f"Plan free retourne trop de données sur {route}: {len(data)} éléments" ) else: assert resp.status_code in (403, 402, 401), ( f"Status inattendu sur route premium pour plan free: {route} → {resp.status_code}" ) def test_injection_dans_bearer_token(self): """Injection dans le token Bearer ne doit pas provoquer d'erreur 500.""" for payload in SQL_INJECTION_PAYLOADS[:3]: encoded = base64.b64encode(payload.encode()).decode() resp = requests.get( f"{BASE_URL}/api/races", headers={"Authorization": f"Bearer {encoded}"}, timeout=5, ) assert resp.status_code != 500, ( f"Erreur serveur sur injection dans Authorization: payload={payload!r}" ) # === Tests validation mots de passe faibles (HRT-63) === class TestWeakPasswordRejection: """Tests rejet mots de passe faibles : blacklist + complexité (HRT-63).""" REGISTER_URL = ( os.environ.get("APP_URL", "http://localhost:8792") + "/api/v1/auth/register" ) WEAK_PASSWORDS = [ "password", "12345678", "qwerty123", "letmein1", "admin123", "welcome1", "iloveyou", "abc1234", "sunshine", "111111111", ] @pytest.mark.parametrize("weak_pwd", WEAK_PASSWORDS) def test_weak_password_rejected(self, weak_pwd): """Les mots de passe faibles/blacklistés doivent retourner 400.""" import time as _time unique_email = f"test_weak_{int(_time.time() * 1000)}_{weak_pwd[:4]}@h3r7.tech" resp = requests.post( self.REGISTER_URL, json={"email": unique_email, "password": weak_pwd, "plan": "free"}, timeout=5, ) assert resp.status_code == 400, ( f"Mot de passe faible accepté: pwd={weak_pwd!r}, status={resp.status_code}" ) body = resp.json() assert "error" in body, f"Pas de champ 'error' dans la réponse: {body}" def test_strong_password_accepted(self): """Un mot de passe fort doit permettre l'inscription (retourne 201).""" import time as _time unique_email = f"test_strong_{int(_time.time() * 1000)}@h3r7.tech" resp = requests.post( self.REGISTER_URL, json={"email": unique_email, "password": "Tr0ub4d@ur!", "plan": "free"}, timeout=5, ) assert resp.status_code == 201, ( f"Mot de passe fort rejeté: status={resp.status_code}, body={resp.text}" ) data = resp.json() assert "token" in data, f"Pas de token dans la réponse: {data}" def test_no_digit_rejected(self): """Un mot de passe sans chiffre doit être rejeté.""" import time as _time unique_email = f"test_nodigit_{int(_time.time() * 1000)}@h3r7.tech" resp = requests.post( self.REGISTER_URL, json={"email": unique_email, "password": "NoDigitPassword", "plan": "free"}, timeout=5, ) assert resp.status_code == 400, ( f"Mot de passe sans chiffre accepté: status={resp.status_code}" ) def test_no_letter_rejected(self): """Un mot de passe sans lettre doit être rejeté.""" import time as _time unique_email = f"test_noletter_{int(_time.time() * 1000)}@h3r7.tech" resp = requests.post( self.REGISTER_URL, json={"email": unique_email, "password": "12345678901", "plan": "free"}, timeout=5, ) assert resp.status_code == 400, ( f"Mot de passe sans lettre accepté: status={resp.status_code}" ) # === Tests rate limiting login === class TestLoginRateLimit: """Tests rate limiting sur /api/v1/auth/login.""" TARGET_URL = ( os.environ.get("APP_URL", "http://localhost:8792") + "/api/v1/auth/login" ) def test_login_brute_force_blocked_after_5_attempts(self): """Après 5 tentatives, le 6ème appel doit retourner 429.""" # Utiliser un email unique pour isoler le test email = f"ratelimit_test_{int(time.time())}@h3r7.tech" for i in range(5): resp = requests.post( self.TARGET_URL, json={"email": email, "password": "wrong_password"}, timeout=5, ) assert resp.status_code in (400, 401), ( f"Tentative {i + 1}: status inattendu {resp.status_code}" ) # La 6ème tentative doit être bloquée resp = requests.post( self.TARGET_URL, json={"email": email, "password": "wrong_password"}, timeout=5, ) assert resp.status_code == 429, ( f"Rate limit non appliqué après 5 tentatives: got {resp.status_code}" ) assert "Retry-After" in resp.headers, "Header Retry-After manquant sur 429" def test_login_429_has_retry_after_header(self): """La réponse 429 doit inclure Retry-After.""" email = f"ratelimit_test2_{int(time.time())}@h3r7.tech" for _ in range(6): requests.post( self.TARGET_URL, json={"email": email, "password": "x"}, timeout=5 ) resp = requests.post( self.TARGET_URL, json={"email": email, "password": "x"}, timeout=5 ) if resp.status_code == 429: assert "Retry-After" in resp.headers assert int(resp.headers["Retry-After"]) > 0 if __name__ == "__main__": import subprocess subprocess.run( [ "python", "-m", "pytest", __file__, "-v", "--tb=short", "--html=tests/reports/security_report.html", "--self-contained-html", ] )