#!/usr/bin/env python3 """ user_tokens.py — Personal API tokens + Webhook configuration (Pro plan) HRT-80 Endpoints: POST /api/v1/user/api-token DELETE /api/v1/user/api-token POST /api/v1/user/webhook DELETE /api/v1/user/webhook """ import hashlib import logging import secrets from flask import Blueprint, g, jsonify, request from api_tokens_db import get_db, migrate_api_tokens_tables from auth import jwt_required_middleware, plan_required logger = logging.getLogger("turf_saas.user_tokens") user_tokens_bp = Blueprint("user_tokens", __name__, url_prefix="/api/v1/user") try: migrate_api_tokens_tables() except Exception as _e: logger.warning("api_tokens_db migration skipped (test env?): %s", _e) def _hash_token(raw: str) -> str: return hashlib.sha256(raw.encode()).hexdigest() @user_tokens_bp.route("/api-token", methods=["POST"]) @jwt_required_middleware @plan_required("pro") def create_api_token(): user = g.current_user user_id = str(user["id"]) conn = get_db() try: existing = conn.execute( "SELECT id, token_prefix, created_at FROM user_api_tokens " "WHERE user_id = ? AND revoked = 0", (user_id,), ).fetchone() if existing: return jsonify( { "error": "Un token actif existe déjà. Révoquez-le avant d'en créer un nouveau.", "existing_prefix": existing["token_prefix"], "created_at": existing["created_at"], } ), 409 raw_token = "trf_" + secrets.token_urlsafe(40) token_hash = _hash_token(raw_token) token_prefix = raw_token[:12] conn.execute( "INSERT INTO user_api_tokens (user_id, token_hash, token_prefix) VALUES (?, ?, ?)", (user_id, token_hash, token_prefix), ) conn.commit() row = conn.execute( "SELECT created_at FROM user_api_tokens WHERE token_hash = ?", (token_hash,), ).fetchone() created_at = row["created_at"] if row else None except Exception as e: conn.rollback() logger.error("create_api_token error for user %s: %s", user_id, e) return jsonify({"error": "Erreur interne"}), 500 finally: conn.close() logger.info("API token created for user %s (prefix=%s)", user_id, token_prefix) return jsonify( { "token": raw_token, "prefix": token_prefix, "created_at": created_at, "warning": "Conservez ce token en lieu sûr. Il ne sera plus affiché.", } ), 201 @user_tokens_bp.route("/api-token", methods=["DELETE"]) @jwt_required_middleware @plan_required("pro") def revoke_api_token(): user = g.current_user user_id = str(user["id"]) conn = get_db() try: result = conn.execute( "UPDATE user_api_tokens SET revoked = 1 WHERE user_id = ? AND revoked = 0", (user_id,), ) conn.commit() revoked_count = result.rowcount except Exception as e: conn.rollback() logger.error("revoke_api_token error for user %s: %s", user_id, e) return jsonify({"error": "Erreur interne"}), 500 finally: conn.close() if revoked_count == 0: return jsonify({"error": "Aucun token actif trouvé"}), 404 logger.info("API token(s) revoked for user %s (%d tokens)", user_id, revoked_count) return jsonify({"revoked": True, "count": revoked_count}), 200 @user_tokens_bp.route("/webhook", methods=["POST"]) @jwt_required_middleware @plan_required("pro") def create_webhook(): user = g.current_user user_id = str(user["id"]) data = request.get_json(silent=True) or {} url = (data.get("url") or "").strip() if not url: return jsonify({"error": "URL du webhook manquante"}), 400 if not url.startswith("https://"): return jsonify( {"error": "L'URL du webhook doit utiliser HTTPS (commencer par https://)"} ), 400 secret = (data.get("secret") or "").strip() or secrets.token_hex(32) conn = get_db() existing = None try: existing = conn.execute( "SELECT id FROM user_webhooks WHERE user_id = ?", (user_id,) ).fetchone() if existing: conn.execute( "UPDATE user_webhooks SET url = ?, secret = ?, created_at = datetime('now') " "WHERE user_id = ?", (url, secret, user_id), ) else: conn.execute( "INSERT INTO user_webhooks (user_id, url, secret) VALUES (?, ?, ?)", (user_id, url, secret), ) conn.commit() except Exception as e: conn.rollback() logger.error("create_webhook error for user %s: %s", user_id, e) return jsonify({"error": "Erreur interne"}), 500 finally: conn.close() action = "mis à jour" if existing else "configuré" logger.info("Webhook %s for user %s: %s", action, user_id, url) return jsonify( { "webhook_url": url, "secret": secret, "message": f"Webhook {action} avec succès", } ), 201 @user_tokens_bp.route("/webhook", methods=["DELETE"]) @jwt_required_middleware @plan_required("pro") def delete_webhook(): user = g.current_user user_id = str(user["id"]) conn = get_db() try: result = conn.execute("DELETE FROM user_webhooks WHERE user_id = ?", (user_id,)) conn.commit() deleted_count = result.rowcount except Exception as e: conn.rollback() logger.error("delete_webhook error for user %s: %s", user_id, e) return jsonify({"error": "Erreur interne"}), 500 finally: conn.close() if deleted_count == 0: return jsonify({"error": "Aucun webhook configuré"}), 404 logger.info("Webhook deleted for user %s", user_id) return jsonify({"deleted": True}), 200