Files
turf_saas/middleware.py
DevOps Engineer 5a23692ad1 feat: Sprint 2-3 — Auth JWT + Multi-tenant (HRT-28)
- auth_db.py: create users, subscriptions, refresh_tokens tables in turf_saas.db
- auth.py: register/login/refresh/logout endpoints, JWT middleware, plan_required decorator, free daily-limit check
- middleware.py: in-memory rate limiter (100 req/min/IP), timestamped access logs
- saas_api.py: Flask app factory wiring JWT, CORS, blueprints, /api/v1/predictions plan-gating
- tests/test_auth.py: 27 pytest tests, 83% coverage (target >=80%)
- API_AUTH.md: full endpoint documentation

Co-Authored-By: Paperclip <noreply@paperclip.ing>
2026-04-25 17:35:45 +02:00

91 lines
3.3 KiB
Python

#!/usr/bin/env python3
"""
Middleware — rate limiting, CORS, and access logging
Sprint 2-3: HRT-28
"""
import logging
import time
from collections import defaultdict
from datetime import datetime, timezone
from functools import wraps
from threading import Lock
from flask import request, jsonify, g
logger = logging.getLogger("turf_saas.middleware")
# ──────────────────────────────────────────────────────────────
# In-memory rate limiter (100 req/min per IP)
# For production: replace with Redis-backed counter
# ──────────────────────────────────────────────────────────────
_rate_store: dict = defaultdict(lambda: {"count": 0, "window_start": 0.0})
_rate_lock = Lock()
RATE_LIMIT = 100 # max requests
RATE_WINDOW = 60 # seconds
def rate_limit_middleware(app):
"""Register before_request rate limiting on the Flask app."""
@app.before_request
def check_rate_limit():
ip = request.remote_addr or "unknown"
now = time.time()
with _rate_lock:
bucket = _rate_store[ip]
if now - bucket["window_start"] >= RATE_WINDOW:
bucket["count"] = 0
bucket["window_start"] = now
bucket["count"] += 1
count = bucket["count"]
remaining = max(0, RATE_LIMIT - count)
if count > RATE_LIMIT:
logger.warning("Rate limit exceeded for IP %s", ip)
resp = jsonify({"error": "Trop de requêtes. Limite: 100/min par IP."})
resp.status_code = 429
resp.headers["X-RateLimit-Limit"] = str(RATE_LIMIT)
resp.headers["X-RateLimit-Remaining"] = "0"
resp.headers["Retry-After"] = str(RATE_WINDOW)
return resp
# Attach headers on all responses via after_request
g.rl_remaining = remaining
# ──────────────────────────────────────────────────────────────
# Access logs (timestamped)
# ──────────────────────────────────────────────────────────────
access_log = logging.getLogger("turf_saas.access")
def access_log_middleware(app):
"""Register after_request access logging on the Flask app."""
@app.after_request
def log_access(response):
ip = request.remote_addr or "unknown"
ts = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
user_id = getattr(g, "current_user_id", "-")
access_log.info(
'%s %s %s "%s %s" %s %s',
ts,
ip,
user_id,
request.method,
request.path,
response.status_code,
response.content_length or 0,
)
# Attach rate-limit headers
remaining = getattr(g, "rl_remaining", None)
if remaining is not None:
response.headers["X-RateLimit-Limit"] = str(RATE_LIMIT)
response.headers["X-RateLimit-Remaining"] = str(remaining)
return response