#!/usr/bin/env python3 """ tests/test_user_tokens.py — Personal API Token + Webhook alertes HRT-80: Tests unitaires et d'intégration Couvre: - POST /api/v1/user/api-token (create) - DELETE /api/v1/user/api-token (revoke) - POST /api/v1/user/webhook (create/upsert) - DELETE /api/v1/user/webhook (delete) - Authentification via X-API-Key - dispatch_webhook() fire-and-forget - Plan enforcement Pro uniquement Run: ./venv/bin/pytest tests/test_user_tokens.py -v --tb=short """ import hashlib import json import os import sqlite3 import sys import tempfile from unittest.mock import MagicMock, patch import pytest # ─── Test DB isolation ──────────────────────────────────────────────────────── _tmp_db = tempfile.NamedTemporaryFile(suffix=".db", delete=False) _tmp_db.close() os.environ["TURF_SAAS_DB"] = _tmp_db.name os.environ["JWT_SECRET_KEY"] = "test-secret-hrt80" # Add project root to path sys.path.insert(0, os.path.dirname(os.path.dirname(__file__))) from app_v1 import create_app # noqa: E402 from api_tokens_db import migrate_api_tokens_tables # noqa: E402 TEST_CONFIG = { "TESTING": True, "JWT_SECRET_KEY": "test-secret-hrt80", } @pytest.fixture(scope="module") def app(): # Enforce this module s temp DB at fixture runtime os.environ["TURF_SAAS_DB"] = _tmp_db.name os.environ["JWT_SECRET_KEY"] = "test-secret-hrt80" migrate_api_tokens_tables() # ensure tables exist in THIS module s temp DB application = create_app() application.config.update(TEST_CONFIG) yield application @pytest.fixture(scope="module") def client(app): return app.test_client() # ─── Helpers ───────────────────────────────────────────────────────────────── def _create_user(client, email, plan="pro"): """Register user (plan=free) then update plan in DB.""" resp = client.post( "/api/v1/auth/register", json={"email": email, "password": "Secure123"}, ) assert resp.status_code == 201, resp.get_json() user_id = resp.get_json()["user_id"] # Update plan directly in DB (no plan-update endpoint in JWT auth) conn = sqlite3.connect(os.environ["TURF_SAAS_DB"]) conn.execute("UPDATE users SET plan = ? WHERE id = ?", (plan, user_id)) conn.commit() conn.close() # Login to get access token login_resp = client.post( "/api/v1/auth/login", json={"email": email, "password": "Secure123"}, ) assert login_resp.status_code == 200, login_resp.get_json() access_token = login_resp.get_json()["access_token"] return access_token, user_id def _auth_header(token): return {"Authorization": f"Bearer {token}"} # ─── Tests: API Token (Pro) ─────────────────────────────────────────────────── class TestApiToken: def test_create_api_token_pro(self, client): """POST /api/v1/user/api-token — Pro user gets 201 + token starting with trf_""" token, _ = _create_user(client, "pro_token@test.com", plan="pro") resp = client.post("/api/v1/user/api-token", headers=_auth_header(token)) assert resp.status_code == 201, resp.get_json() data = resp.get_json() assert data["token"].startswith("trf_") assert data["prefix"] == data["token"][:12] assert "warning" in data assert "created_at" in data def test_create_api_token_stores_hash_not_raw(self, client): """Second POST returns 409 — only hashed token stored""" token, _ = _create_user(client, "pro_token2@test.com", plan="pro") # First create r1 = client.post("/api/v1/user/api-token", headers=_auth_header(token)) assert r1.status_code == 201 raw_token = r1.get_json()["token"] # Second create should conflict r2 = client.post("/api/v1/user/api-token", headers=_auth_header(token)) assert r2.status_code == 409 data = r2.get_json() assert "existing_prefix" in data # Verify raw token is NOT stored in DB (only hash) conn = sqlite3.connect(os.environ["TURF_SAAS_DB"]) row = conn.execute( "SELECT token_hash FROM user_api_tokens WHERE token_prefix = ?", (raw_token[:12],), ).fetchone() conn.close() assert row is not None assert row[0] != raw_token # hash != raw assert len(row[0]) == 64 # SHA256 hex def test_create_api_token_free_user(self, client): """Free user gets 403""" token, _ = _create_user(client, "free_token@test.com", plan="free") resp = client.post("/api/v1/user/api-token", headers=_auth_header(token)) assert resp.status_code == 403 def test_create_api_token_premium_user(self, client): """Premium user gets 403 (Pro only feature)""" token, _ = _create_user(client, "premium_token@test.com", plan="premium") resp = client.post("/api/v1/user/api-token", headers=_auth_header(token)) assert resp.status_code == 403 def test_create_api_token_no_auth(self, client): """No auth → 401""" resp = client.post("/api/v1/user/api-token") assert resp.status_code == 401 def test_revoke_api_token(self, client): """DELETE /api/v1/user/api-token — Pro user revokes active token""" token, _ = _create_user(client, "pro_revoke@test.com", plan="pro") # Create first client.post("/api/v1/user/api-token", headers=_auth_header(token)) # Revoke resp = client.delete("/api/v1/user/api-token", headers=_auth_header(token)) assert resp.status_code == 200 data = resp.get_json() assert data["revoked"] is True assert data["count"] >= 1 def test_revoke_no_active_token(self, client): """DELETE with no active token → 404""" token, _ = _create_user(client, "pro_notoken@test.com", plan="pro") resp = client.delete("/api/v1/user/api-token", headers=_auth_header(token)) assert resp.status_code == 404 def test_revoke_non_pro(self, client): """DELETE for free user → 403""" token, _ = _create_user(client, "free_revoke@test.com", plan="free") resp = client.delete("/api/v1/user/api-token", headers=_auth_header(token)) assert resp.status_code == 403 # ─── Tests: X-API-Key Authentication ───────────────────────────────────────── class TestApiKeyAuth: def test_api_key_auth_on_protected_route(self, client): """Valid X-API-Key authenticates on protected route""" token, _ = _create_user(client, "apikey_auth@test.com", plan="pro") # Create API token r = client.post("/api/v1/user/api-token", headers=_auth_header(token)) assert r.status_code == 201 raw_key = r.get_json()["token"] # Use X-API-Key to access a protected route (try create again → 409 means authenticated) resp = client.post("/api/v1/user/api-token", headers={"X-API-Key": raw_key}) # 409 means we were authenticated; 401 means auth failed assert resp.status_code == 409 def test_api_key_invalid(self, client): """Invalid X-API-Key → 401""" resp = client.post( "/api/v1/user/api-token", headers={"X-API-Key": "trf_invalidkeyXXXXXXXXXXXXXXXXXX"}, ) assert resp.status_code == 401 def test_api_key_revoked(self, client): """Revoked X-API-Key → 401""" token, _ = _create_user(client, "revoked_apikey@test.com", plan="pro") # Create token r = client.post("/api/v1/user/api-token", headers=_auth_header(token)) assert r.status_code == 201 raw_key = r.get_json()["token"] # Revoke it client.delete("/api/v1/user/api-token", headers=_auth_header(token)) # Try using revoked key resp = client.post("/api/v1/user/api-token", headers={"X-API-Key": raw_key}) assert resp.status_code == 401 def test_revoke_then_cannot_auth(self, client): """Full flow: create → use → revoke → X-API-Key rejected""" token, _ = _create_user(client, "flow_test@test.com", plan="pro") # Create r = client.post("/api/v1/user/api-token", headers=_auth_header(token)) raw_key = r.get_json()["token"] # Validate it works (409 because key exists) r2 = client.post("/api/v1/user/api-token", headers={"X-API-Key": raw_key}) assert r2.status_code == 409 # Revoke client.delete("/api/v1/user/api-token", headers=_auth_header(token)) # Try again with revoked key r3 = client.post("/api/v1/user/api-token", headers={"X-API-Key": raw_key}) assert r3.status_code == 401 # ─── Tests: Webhook ─────────────────────────────────────────────────────────── class TestWebhook: def test_create_webhook_pro(self, client): """POST /api/v1/user/webhook — Pro user with provided secret → 201""" token, _ = _create_user(client, "webhook_pro@test.com", plan="pro") resp = client.post( "/api/v1/user/webhook", headers=_auth_header(token), json={"url": "https://example.com/hook", "secret": "mysecret123"}, ) assert resp.status_code == 201 data = resp.get_json() assert data["webhook_url"] == "https://example.com/hook" assert data["secret"] == "mysecret123" def test_create_webhook_auto_secret(self, client): """POST without secret → auto-generated secret""" token, _ = _create_user(client, "webhook_auto@test.com", plan="pro") resp = client.post( "/api/v1/user/webhook", headers=_auth_header(token), json={"url": "https://auto.example.com/hook"}, ) assert resp.status_code == 201 data = resp.get_json() assert len(data["secret"]) == 64 # token_hex(32) = 64 hex chars def test_create_webhook_non_pro_free(self, client): """Free user → 403""" token, _ = _create_user(client, "webhook_free@test.com", plan="free") resp = client.post( "/api/v1/user/webhook", headers=_auth_header(token), json={"url": "https://example.com/hook"}, ) assert resp.status_code == 403 def test_create_webhook_non_pro_premium(self, client): """Premium user → 403""" token, _ = _create_user(client, "webhook_premium@test.com", plan="premium") resp = client.post( "/api/v1/user/webhook", headers=_auth_header(token), json={"url": "https://example.com/hook"}, ) assert resp.status_code == 403 def test_create_webhook_url_not_https(self, client): """HTTP URL → 400""" token, _ = _create_user(client, "webhook_http@test.com", plan="pro") resp = client.post( "/api/v1/user/webhook", headers=_auth_header(token), json={"url": "http://example.com/hook"}, ) assert resp.status_code == 400 assert "https" in resp.get_json()["error"].lower() def test_create_webhook_missing_url(self, client): """Missing URL → 400""" token, _ = _create_user(client, "webhook_nourl@test.com", plan="pro") resp = client.post( "/api/v1/user/webhook", headers=_auth_header(token), json={}, ) assert resp.status_code == 400 def test_webhook_upsert(self, client): """Second POST updates URL (upsert behavior)""" token, _ = _create_user(client, "webhook_upsert@test.com", plan="pro") client.post( "/api/v1/user/webhook", headers=_auth_header(token), json={"url": "https://first.example.com/hook"}, ) resp = client.post( "/api/v1/user/webhook", headers=_auth_header(token), json={"url": "https://second.example.com/hook"}, ) assert resp.status_code == 201 assert resp.get_json()["webhook_url"] == "https://second.example.com/hook" def test_delete_webhook(self, client): """DELETE /api/v1/user/webhook → 200""" token, _ = _create_user(client, "webhook_delete@test.com", plan="pro") client.post( "/api/v1/user/webhook", headers=_auth_header(token), json={"url": "https://delete.example.com/hook"}, ) resp = client.delete("/api/v1/user/webhook", headers=_auth_header(token)) assert resp.status_code == 200 assert resp.get_json()["deleted"] is True def test_delete_webhook_not_configured(self, client): """DELETE without webhook configured → 404""" token, _ = _create_user(client, "webhook_notset@test.com", plan="pro") resp = client.delete("/api/v1/user/webhook", headers=_auth_header(token)) assert resp.status_code == 404 def test_delete_webhook_non_pro(self, client): """Free user DELETE → 403""" token, _ = _create_user(client, "webhook_freedelete@test.com", plan="free") resp = client.delete("/api/v1/user/webhook", headers=_auth_header(token)) assert resp.status_code == 403 # ─── Tests: dispatch_webhook ────────────────────────────────────────────────── class TestDispatchWebhook: def test_dispatch_no_webhook_configured(self): """dispatch_webhook silently returns when no webhook is configured""" with patch("api_v1.utils_webhook.get_db") as mock_get_db: mock_conn = MagicMock() mock_conn.execute.return_value.fetchone.return_value = None mock_get_db.return_value = mock_conn from api_v1.utils_webhook import dispatch_webhook # Should not raise, should return silently dispatch_webhook("nonexistent_user", "new_prediction", {"data": "test"}) def test_dispatch_sends_hmac_header(self): """dispatch_webhook sends correct HMAC-SHA256 signature header""" test_secret = "testsecret" test_url = "https://hook.example.com/receive" test_payload = {"race_id": "R123", "top1": "Cheval Blanc"} with ( patch("api_v1.utils_webhook.get_db") as mock_get_db, patch("api_v1.utils_webhook.requests.post") as mock_post, ): mock_row = MagicMock() mock_row.__getitem__ = lambda self, key: ( test_url if key == "url" else test_secret ) mock_conn = MagicMock() mock_conn.execute.return_value.fetchone.return_value = mock_row mock_get_db.return_value = mock_conn mock_response = MagicMock() mock_response.status_code = 200 mock_post.return_value = mock_response from api_v1.utils_webhook import dispatch_webhook, EVENT_NEW_PREDICTION dispatch_webhook("user123", EVENT_NEW_PREDICTION, test_payload) assert mock_post.called call_kwargs = mock_post.call_args headers_sent = call_kwargs.kwargs.get("headers") or call_kwargs[1].get( "headers" ) assert "X-Turf-Signature" in headers_sent assert headers_sent["X-Turf-Signature"].startswith("sha256=") assert headers_sent["X-Turf-Event"] == EVENT_NEW_PREDICTION