Test isolation fixes: - auth_db.get_db(): read TURF_SAAS_DB dynamically (not frozen at import) - api_v1/utils.get_db(): read TURF_SAAS_DB dynamically (not frozen at import) - api_tokens_db.get_db(): read TURF_SAAS_DB dynamically (not frozen at import) - tests/test_history.py: enforce _tmp_db.name + call init_auth_tables() in fixtures - tests/test_user_tokens.py: enforce _tmp_db.name + call migrate_api_tokens_tables() in app fixture Auth compatibility fixes: - api_v1/routes/history.py: use auth.jwt_required_middleware (flask_jwt_extended) with saas_auth fallback for portal_server context - api_v1/routes/ml_feedback.py: same auth import strategy - api_v1/routes/user.py: same auth import strategy Dependencies: - requirements.txt: add optuna>=4.0.0 (used in ML ensemble tests and training) Co-Authored-By: Paperclip <noreply@paperclip.ing>
389 lines
16 KiB
Python
389 lines
16 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
tests/test_user_tokens.py — Personal API Token + Webhook alertes
|
|
HRT-80: Tests unitaires et d'intégration
|
|
|
|
Couvre:
|
|
- POST /api/v1/user/api-token (create)
|
|
- DELETE /api/v1/user/api-token (revoke)
|
|
- POST /api/v1/user/webhook (create/upsert)
|
|
- DELETE /api/v1/user/webhook (delete)
|
|
- Authentification via X-API-Key
|
|
- dispatch_webhook() fire-and-forget
|
|
- Plan enforcement Pro uniquement
|
|
|
|
Run:
|
|
./venv/bin/pytest tests/test_user_tokens.py -v --tb=short
|
|
"""
|
|
|
|
import hashlib
|
|
import json
|
|
import os
|
|
import sqlite3
|
|
import sys
|
|
import tempfile
|
|
from unittest.mock import MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
# ─── Test DB isolation ────────────────────────────────────────────────────────
|
|
_tmp_db = tempfile.NamedTemporaryFile(suffix=".db", delete=False)
|
|
_tmp_db.close()
|
|
os.environ["TURF_SAAS_DB"] = _tmp_db.name
|
|
os.environ["JWT_SECRET_KEY"] = "test-secret-hrt80"
|
|
|
|
# Add project root to path
|
|
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
|
|
|
|
from app_v1 import create_app # noqa: E402
|
|
from api_tokens_db import migrate_api_tokens_tables # noqa: E402
|
|
|
|
TEST_CONFIG = {
|
|
"TESTING": True,
|
|
"JWT_SECRET_KEY": "test-secret-hrt80",
|
|
}
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def app():
|
|
# Enforce this module s temp DB at fixture runtime
|
|
os.environ["TURF_SAAS_DB"] = _tmp_db.name
|
|
os.environ["JWT_SECRET_KEY"] = "test-secret-hrt80"
|
|
migrate_api_tokens_tables() # ensure tables exist in THIS module s temp DB
|
|
application = create_app()
|
|
application.config.update(TEST_CONFIG)
|
|
yield application
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def client(app):
|
|
return app.test_client()
|
|
|
|
|
|
# ─── Helpers ─────────────────────────────────────────────────────────────────
|
|
|
|
|
|
def _create_user(client, email, plan="pro"):
|
|
"""Register user (plan=free) then update plan in DB."""
|
|
resp = client.post(
|
|
"/api/v1/auth/register",
|
|
json={"email": email, "password": "Secure123"},
|
|
)
|
|
assert resp.status_code == 201, resp.get_json()
|
|
user_id = resp.get_json()["user_id"]
|
|
|
|
# Update plan directly in DB (no plan-update endpoint in JWT auth)
|
|
conn = sqlite3.connect(os.environ["TURF_SAAS_DB"])
|
|
conn.execute("UPDATE users SET plan = ? WHERE id = ?", (plan, user_id))
|
|
conn.commit()
|
|
conn.close()
|
|
|
|
# Login to get access token
|
|
login_resp = client.post(
|
|
"/api/v1/auth/login",
|
|
json={"email": email, "password": "Secure123"},
|
|
)
|
|
assert login_resp.status_code == 200, login_resp.get_json()
|
|
access_token = login_resp.get_json()["access_token"]
|
|
return access_token, user_id
|
|
|
|
|
|
def _auth_header(token):
|
|
return {"Authorization": f"Bearer {token}"}
|
|
|
|
|
|
# ─── Tests: API Token (Pro) ───────────────────────────────────────────────────
|
|
|
|
|
|
class TestApiToken:
|
|
def test_create_api_token_pro(self, client):
|
|
"""POST /api/v1/user/api-token — Pro user gets 201 + token starting with trf_"""
|
|
token, _ = _create_user(client, "pro_token@test.com", plan="pro")
|
|
resp = client.post("/api/v1/user/api-token", headers=_auth_header(token))
|
|
assert resp.status_code == 201, resp.get_json()
|
|
data = resp.get_json()
|
|
assert data["token"].startswith("trf_")
|
|
assert data["prefix"] == data["token"][:12]
|
|
assert "warning" in data
|
|
assert "created_at" in data
|
|
|
|
def test_create_api_token_stores_hash_not_raw(self, client):
|
|
"""Second POST returns 409 — only hashed token stored"""
|
|
token, _ = _create_user(client, "pro_token2@test.com", plan="pro")
|
|
# First create
|
|
r1 = client.post("/api/v1/user/api-token", headers=_auth_header(token))
|
|
assert r1.status_code == 201
|
|
raw_token = r1.get_json()["token"]
|
|
# Second create should conflict
|
|
r2 = client.post("/api/v1/user/api-token", headers=_auth_header(token))
|
|
assert r2.status_code == 409
|
|
data = r2.get_json()
|
|
assert "existing_prefix" in data
|
|
# Verify raw token is NOT stored in DB (only hash)
|
|
conn = sqlite3.connect(os.environ["TURF_SAAS_DB"])
|
|
row = conn.execute(
|
|
"SELECT token_hash FROM user_api_tokens WHERE token_prefix = ?",
|
|
(raw_token[:12],),
|
|
).fetchone()
|
|
conn.close()
|
|
assert row is not None
|
|
assert row[0] != raw_token # hash != raw
|
|
assert len(row[0]) == 64 # SHA256 hex
|
|
|
|
def test_create_api_token_free_user(self, client):
|
|
"""Free user gets 403"""
|
|
token, _ = _create_user(client, "free_token@test.com", plan="free")
|
|
resp = client.post("/api/v1/user/api-token", headers=_auth_header(token))
|
|
assert resp.status_code == 403
|
|
|
|
def test_create_api_token_premium_user(self, client):
|
|
"""Premium user gets 403 (Pro only feature)"""
|
|
token, _ = _create_user(client, "premium_token@test.com", plan="premium")
|
|
resp = client.post("/api/v1/user/api-token", headers=_auth_header(token))
|
|
assert resp.status_code == 403
|
|
|
|
def test_create_api_token_no_auth(self, client):
|
|
"""No auth → 401"""
|
|
resp = client.post("/api/v1/user/api-token")
|
|
assert resp.status_code == 401
|
|
|
|
def test_revoke_api_token(self, client):
|
|
"""DELETE /api/v1/user/api-token — Pro user revokes active token"""
|
|
token, _ = _create_user(client, "pro_revoke@test.com", plan="pro")
|
|
# Create first
|
|
client.post("/api/v1/user/api-token", headers=_auth_header(token))
|
|
# Revoke
|
|
resp = client.delete("/api/v1/user/api-token", headers=_auth_header(token))
|
|
assert resp.status_code == 200
|
|
data = resp.get_json()
|
|
assert data["revoked"] is True
|
|
assert data["count"] >= 1
|
|
|
|
def test_revoke_no_active_token(self, client):
|
|
"""DELETE with no active token → 404"""
|
|
token, _ = _create_user(client, "pro_notoken@test.com", plan="pro")
|
|
resp = client.delete("/api/v1/user/api-token", headers=_auth_header(token))
|
|
assert resp.status_code == 404
|
|
|
|
def test_revoke_non_pro(self, client):
|
|
"""DELETE for free user → 403"""
|
|
token, _ = _create_user(client, "free_revoke@test.com", plan="free")
|
|
resp = client.delete("/api/v1/user/api-token", headers=_auth_header(token))
|
|
assert resp.status_code == 403
|
|
|
|
|
|
# ─── Tests: X-API-Key Authentication ─────────────────────────────────────────
|
|
|
|
|
|
class TestApiKeyAuth:
|
|
def test_api_key_auth_on_protected_route(self, client):
|
|
"""Valid X-API-Key authenticates on protected route"""
|
|
token, _ = _create_user(client, "apikey_auth@test.com", plan="pro")
|
|
# Create API token
|
|
r = client.post("/api/v1/user/api-token", headers=_auth_header(token))
|
|
assert r.status_code == 201
|
|
raw_key = r.get_json()["token"]
|
|
# Use X-API-Key to access a protected route (try create again → 409 means authenticated)
|
|
resp = client.post("/api/v1/user/api-token", headers={"X-API-Key": raw_key})
|
|
# 409 means we were authenticated; 401 means auth failed
|
|
assert resp.status_code == 409
|
|
|
|
def test_api_key_invalid(self, client):
|
|
"""Invalid X-API-Key → 401"""
|
|
resp = client.post(
|
|
"/api/v1/user/api-token",
|
|
headers={"X-API-Key": "trf_invalidkeyXXXXXXXXXXXXXXXXXX"},
|
|
)
|
|
assert resp.status_code == 401
|
|
|
|
def test_api_key_revoked(self, client):
|
|
"""Revoked X-API-Key → 401"""
|
|
token, _ = _create_user(client, "revoked_apikey@test.com", plan="pro")
|
|
# Create token
|
|
r = client.post("/api/v1/user/api-token", headers=_auth_header(token))
|
|
assert r.status_code == 201
|
|
raw_key = r.get_json()["token"]
|
|
# Revoke it
|
|
client.delete("/api/v1/user/api-token", headers=_auth_header(token))
|
|
# Try using revoked key
|
|
resp = client.post("/api/v1/user/api-token", headers={"X-API-Key": raw_key})
|
|
assert resp.status_code == 401
|
|
|
|
def test_revoke_then_cannot_auth(self, client):
|
|
"""Full flow: create → use → revoke → X-API-Key rejected"""
|
|
token, _ = _create_user(client, "flow_test@test.com", plan="pro")
|
|
# Create
|
|
r = client.post("/api/v1/user/api-token", headers=_auth_header(token))
|
|
raw_key = r.get_json()["token"]
|
|
# Validate it works (409 because key exists)
|
|
r2 = client.post("/api/v1/user/api-token", headers={"X-API-Key": raw_key})
|
|
assert r2.status_code == 409
|
|
# Revoke
|
|
client.delete("/api/v1/user/api-token", headers=_auth_header(token))
|
|
# Try again with revoked key
|
|
r3 = client.post("/api/v1/user/api-token", headers={"X-API-Key": raw_key})
|
|
assert r3.status_code == 401
|
|
|
|
|
|
# ─── Tests: Webhook ───────────────────────────────────────────────────────────
|
|
|
|
|
|
class TestWebhook:
|
|
def test_create_webhook_pro(self, client):
|
|
"""POST /api/v1/user/webhook — Pro user with provided secret → 201"""
|
|
token, _ = _create_user(client, "webhook_pro@test.com", plan="pro")
|
|
resp = client.post(
|
|
"/api/v1/user/webhook",
|
|
headers=_auth_header(token),
|
|
json={"url": "https://example.com/hook", "secret": "mysecret123"},
|
|
)
|
|
assert resp.status_code == 201
|
|
data = resp.get_json()
|
|
assert data["webhook_url"] == "https://example.com/hook"
|
|
assert data["secret"] == "mysecret123"
|
|
|
|
def test_create_webhook_auto_secret(self, client):
|
|
"""POST without secret → auto-generated secret"""
|
|
token, _ = _create_user(client, "webhook_auto@test.com", plan="pro")
|
|
resp = client.post(
|
|
"/api/v1/user/webhook",
|
|
headers=_auth_header(token),
|
|
json={"url": "https://auto.example.com/hook"},
|
|
)
|
|
assert resp.status_code == 201
|
|
data = resp.get_json()
|
|
assert len(data["secret"]) == 64 # token_hex(32) = 64 hex chars
|
|
|
|
def test_create_webhook_non_pro_free(self, client):
|
|
"""Free user → 403"""
|
|
token, _ = _create_user(client, "webhook_free@test.com", plan="free")
|
|
resp = client.post(
|
|
"/api/v1/user/webhook",
|
|
headers=_auth_header(token),
|
|
json={"url": "https://example.com/hook"},
|
|
)
|
|
assert resp.status_code == 403
|
|
|
|
def test_create_webhook_non_pro_premium(self, client):
|
|
"""Premium user → 403"""
|
|
token, _ = _create_user(client, "webhook_premium@test.com", plan="premium")
|
|
resp = client.post(
|
|
"/api/v1/user/webhook",
|
|
headers=_auth_header(token),
|
|
json={"url": "https://example.com/hook"},
|
|
)
|
|
assert resp.status_code == 403
|
|
|
|
def test_create_webhook_url_not_https(self, client):
|
|
"""HTTP URL → 400"""
|
|
token, _ = _create_user(client, "webhook_http@test.com", plan="pro")
|
|
resp = client.post(
|
|
"/api/v1/user/webhook",
|
|
headers=_auth_header(token),
|
|
json={"url": "http://example.com/hook"},
|
|
)
|
|
assert resp.status_code == 400
|
|
assert "https" in resp.get_json()["error"].lower()
|
|
|
|
def test_create_webhook_missing_url(self, client):
|
|
"""Missing URL → 400"""
|
|
token, _ = _create_user(client, "webhook_nourl@test.com", plan="pro")
|
|
resp = client.post(
|
|
"/api/v1/user/webhook",
|
|
headers=_auth_header(token),
|
|
json={},
|
|
)
|
|
assert resp.status_code == 400
|
|
|
|
def test_webhook_upsert(self, client):
|
|
"""Second POST updates URL (upsert behavior)"""
|
|
token, _ = _create_user(client, "webhook_upsert@test.com", plan="pro")
|
|
client.post(
|
|
"/api/v1/user/webhook",
|
|
headers=_auth_header(token),
|
|
json={"url": "https://first.example.com/hook"},
|
|
)
|
|
resp = client.post(
|
|
"/api/v1/user/webhook",
|
|
headers=_auth_header(token),
|
|
json={"url": "https://second.example.com/hook"},
|
|
)
|
|
assert resp.status_code == 201
|
|
assert resp.get_json()["webhook_url"] == "https://second.example.com/hook"
|
|
|
|
def test_delete_webhook(self, client):
|
|
"""DELETE /api/v1/user/webhook → 200"""
|
|
token, _ = _create_user(client, "webhook_delete@test.com", plan="pro")
|
|
client.post(
|
|
"/api/v1/user/webhook",
|
|
headers=_auth_header(token),
|
|
json={"url": "https://delete.example.com/hook"},
|
|
)
|
|
resp = client.delete("/api/v1/user/webhook", headers=_auth_header(token))
|
|
assert resp.status_code == 200
|
|
assert resp.get_json()["deleted"] is True
|
|
|
|
def test_delete_webhook_not_configured(self, client):
|
|
"""DELETE without webhook configured → 404"""
|
|
token, _ = _create_user(client, "webhook_notset@test.com", plan="pro")
|
|
resp = client.delete("/api/v1/user/webhook", headers=_auth_header(token))
|
|
assert resp.status_code == 404
|
|
|
|
def test_delete_webhook_non_pro(self, client):
|
|
"""Free user DELETE → 403"""
|
|
token, _ = _create_user(client, "webhook_freedelete@test.com", plan="free")
|
|
resp = client.delete("/api/v1/user/webhook", headers=_auth_header(token))
|
|
assert resp.status_code == 403
|
|
|
|
|
|
# ─── Tests: dispatch_webhook ──────────────────────────────────────────────────
|
|
|
|
|
|
class TestDispatchWebhook:
|
|
def test_dispatch_no_webhook_configured(self):
|
|
"""dispatch_webhook silently returns when no webhook is configured"""
|
|
with patch("api_v1.utils_webhook.get_db") as mock_get_db:
|
|
mock_conn = MagicMock()
|
|
mock_conn.execute.return_value.fetchone.return_value = None
|
|
mock_get_db.return_value = mock_conn
|
|
|
|
from api_v1.utils_webhook import dispatch_webhook
|
|
|
|
# Should not raise, should return silently
|
|
dispatch_webhook("nonexistent_user", "new_prediction", {"data": "test"})
|
|
|
|
def test_dispatch_sends_hmac_header(self):
|
|
"""dispatch_webhook sends correct HMAC-SHA256 signature header"""
|
|
test_secret = "testsecret"
|
|
test_url = "https://hook.example.com/receive"
|
|
test_payload = {"race_id": "R123", "top1": "Cheval Blanc"}
|
|
|
|
with (
|
|
patch("api_v1.utils_webhook.get_db") as mock_get_db,
|
|
patch("api_v1.utils_webhook.requests.post") as mock_post,
|
|
):
|
|
mock_row = MagicMock()
|
|
mock_row.__getitem__ = lambda self, key: (
|
|
test_url if key == "url" else test_secret
|
|
)
|
|
mock_conn = MagicMock()
|
|
mock_conn.execute.return_value.fetchone.return_value = mock_row
|
|
mock_get_db.return_value = mock_conn
|
|
|
|
mock_response = MagicMock()
|
|
mock_response.status_code = 200
|
|
mock_post.return_value = mock_response
|
|
|
|
from api_v1.utils_webhook import dispatch_webhook, EVENT_NEW_PREDICTION
|
|
|
|
dispatch_webhook("user123", EVENT_NEW_PREDICTION, test_payload)
|
|
|
|
assert mock_post.called
|
|
call_kwargs = mock_post.call_args
|
|
headers_sent = call_kwargs.kwargs.get("headers") or call_kwargs[1].get(
|
|
"headers"
|
|
)
|
|
assert "X-Turf-Signature" in headers_sent
|
|
assert headers_sent["X-Turf-Signature"].startswith("sha256=")
|
|
assert headers_sent["X-Turf-Event"] == EVENT_NEW_PREDICTION
|