#!/usr/bin/env python3 """ Token Broker API — JWT token management service Port: 8783 | DB: PostgreSQL 5434 HRT-198 — Setup infra (PostgreSQL + Flask scaffold) Endpoints: GET /health — Healthcheck POST /api/v1/tokens — Issue new token (create) GET /api/v1/tokens/:id — Get token by ID POST /api/v1/tokens/verify — Verify token POST /api/v1/tokens/revoke/:id — Revoke token GET /api/v1/tokens/user/:userId — List tokens for user """ import os import sys import uuid import hashlib import secrets import logging import logging.handlers from datetime import datetime, timedelta, timezone from functools import wraps from flask import Flask, request, jsonify, g from flask_cors import CORS LOG_DIR = os.path.join(os.path.dirname(__file__), "logs") os.makedirs(LOG_DIR, exist_ok=True) logging.basicConfig( level=logging.INFO, format="%(asctime)s [%(levelname)s] token-broker: %(name)s: %(message)s", handlers=[ logging.StreamHandler(sys.stdout), logging.handlers.RotatingFileHandler( os.path.join(LOG_DIR, "token_broker.log"), maxBytes=5 * 1024 * 1024, backupCount=3, ), ], ) logger = logging.getLogger("token_broker") DB_HOST = os.environ.get("TOKEN_BROKER_DB_HOST", "127.0.0.1") DB_PORT = int(os.environ.get("TOKEN_BROKER_DB_PORT", "5434")) DB_NAME = os.environ.get("TOKEN_BROKER_DB_NAME", "token_broker") DB_USER = os.environ.get("TOKEN_BROKER_DB_USER", "token_broker") DB_PASSWORD = os.environ.get("TOKEN_BROKER_DB_PASSWORD", "") JWT_SECRET = os.environ.get( "TOKEN_BROKER_JWT_SECRET", "CHANGE_ME_" + secrets.token_hex(32) ) ACCESS_TOKEN_EXPIRY = int(os.environ.get("TOKEN_BROKER_ACCESS_EXPIRY", "900")) REFRESH_TOKEN_EXPIRY = int(os.environ.get("TOKEN_BROKER_REFRESH_EXPIRY", "2592000")) def get_pg_conn(): try: import psycopg2 import psycopg2.extras conn = psycopg2.connect( host=DB_HOST, port=DB_PORT, dbname=DB_NAME, user=DB_USER, password=DB_PASSWORD, ) conn.autocommit = True return conn except Exception as e: logger.error(f"PostgreSQL connection failed: {e}") return None def init_db(): conn = get_pg_conn() if not conn: logger.error("Cannot initialize DB — no connection") return False try: cur = conn.cursor() cur.execute(""" CREATE TABLE IF NOT EXISTS api_tokens ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), user_id INTEGER NOT NULL, name TEXT NOT NULL DEFAULT 'default', token_hash TEXT NOT NULL UNIQUE, token_prefix TEXT NOT NULL, scopes TEXT[] DEFAULT '{}', is_active BOOLEAN NOT NULL DEFAULT TRUE, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), expires_at TIMESTAMPTZ, last_used_at TIMESTAMPTZ, metadata JSONB DEFAULT '{}' ); CREATE TABLE IF NOT EXISTS refresh_tokens ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), user_id INTEGER NOT NULL, token_hash TEXT NOT NULL UNIQUE, token_prefix TEXT NOT NULL, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), expires_at TIMESTAMPTZ NOT NULL, revoked BOOLEAN NOT NULL DEFAULT FALSE, revoked_at TIMESTAMPTZ, replaced_by UUID ); CREATE TABLE IF NOT EXISTS token_audit_log ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), user_id INTEGER, action TEXT NOT NULL, token_prefix TEXT, ip_address TEXT, user_agent TEXT, details JSONB DEFAULT '{}', created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); CREATE TABLE IF NOT EXISTS clients ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), client_id TEXT NOT NULL UNIQUE, client_secret TEXT NOT NULL, name TEXT NOT NULL, description TEXT DEFAULT '', redirect_uris TEXT[] DEFAULT '{}', scopes TEXT[] DEFAULT '{}', is_active BOOLEAN NOT NULL DEFAULT TRUE, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); CREATE TABLE IF NOT EXISTS providers ( id UUID PRIMARY KEY DEFAULT gen_random_uuid(), name TEXT NOT NULL UNIQUE, provider_type TEXT NOT NULL DEFAULT 'oauth2', issuer_url TEXT, client_id TEXT, client_secret TEXT, scopes TEXT[] DEFAULT '{}', config JSONB DEFAULT '{}', is_active BOOLEAN NOT NULL DEFAULT TRUE, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), updated_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); CREATE TABLE IF NOT EXISTS token_usage ( id BIGSERIAL PRIMARY KEY, user_id INTEGER NOT NULL, token_id UUID, action TEXT NOT NULL DEFAULT 'verify', endpoint TEXT, status TEXT NOT NULL DEFAULT 'success', response_time_ms INTEGER, ip_address TEXT, created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() ); CREATE INDEX IF NOT EXISTS idx_api_tokens_user_id ON api_tokens(user_id); CREATE INDEX IF NOT EXISTS idx_api_tokens_token_hash ON api_tokens(token_hash); CREATE INDEX IF NOT EXISTS idx_refresh_tokens_user_id ON refresh_tokens(user_id); CREATE INDEX IF NOT EXISTS idx_refresh_tokens_token_hash ON refresh_tokens(token_hash); CREATE INDEX IF NOT EXISTS idx_token_audit_log_user_id ON token_audit_log(user_id); CREATE INDEX IF NOT EXISTS idx_token_audit_log_created_at ON token_audit_log(created_at); CREATE INDEX IF NOT EXISTS idx_clients_client_id ON clients(client_id); CREATE INDEX IF NOT EXISTS idx_providers_name ON providers(name); CREATE INDEX IF NOT EXISTS idx_token_usage_user_id ON token_usage(user_id); CREATE INDEX IF NOT EXISTS idx_token_usage_created_at ON token_usage(created_at); """) cur.close() conn.close() logger.info("Database tables initialized successfully") return True except Exception as e: logger.error(f"Database initialization failed: {e}") return False def create_app(): app = Flask(__name__) app.config["JWT_SECRET"] = JWT_SECRET app.config["ACCESS_TOKEN_EXPIRY"] = ACCESS_TOKEN_EXPIRY app.config["REFRESH_TOKEN_EXPIRY"] = REFRESH_TOKEN_EXPIRY CORS(app) register_routes(app) register_error_handlers(app) return app def token_required(f): @wraps(f) def decorated(*args, **kwargs): auth_header = request.headers.get("Authorization", "") if not auth_header.startswith("Bearer "): return jsonify({"error": "missing_token", "message": "Bearer token required"}), 401 token = auth_header.split(" ", 1)[1] payload = verify_jwt_token(token) if not payload: return jsonify({"error": "invalid_token", "message": "Token invalid or expired"}), 401 g.user_id = payload.get("user_id") g.token_id = payload.get("token_id") g.scopes = payload.get("scopes", []) return f(*args, **kwargs) return decorated def generate_token_pair(user_id, scopes=None, metadata=None): import jwt as pyjwt now = datetime.now(timezone.utc) access_payload = { "user_id": user_id, "token_id": str(uuid.uuid4()), "scopes": scopes or [], "type": "access", "iat": now, "exp": now + timedelta(seconds=ACCESS_TOKEN_EXPIRY), } access_token = pyjwt.encode(access_payload, JWT_SECRET, algorithm="HS256") refresh_id = str(uuid.uuid4()) refresh_raw = secrets.token_urlsafe(48) refresh_payload = { "user_id": user_id, "refresh_id": refresh_id, "token_hash": hashlib.sha256(refresh_raw.encode()).hexdigest(), "type": "refresh", "iat": now, "exp": now + timedelta(seconds=REFRESH_TOKEN_EXPIRY), } refresh_token = pyjwt.encode(refresh_payload, JWT_SECRET, algorithm="HS256") store_refresh_token(user_id, refresh_id, refresh_payload["token_hash"]) log_audit(user_id, "token_issued", access_payload["token_id"][:8]) return { "access_token": access_token, "refresh_token": refresh_raw, "expires_in": ACCESS_TOKEN_EXPIRY, "token_type": "Bearer", } def verify_jwt_token(token): import jwt as pyjwt try: payload = pyjwt.decode(token, JWT_SECRET, algorithms=["HS256"]) if payload.get("type") == "refresh": token_hash = hashlib.sha256(token.encode()).hexdigest() conn = get_pg_conn() if conn: cur = conn.cursor() cur.execute( "SELECT revoked FROM refresh_tokens WHERE token_hash = %s AND expires_at > NOW()", (token_hash,), ) row = cur.fetchone() cur.close() conn.close() if not row or row[0]: return None return payload except Exception: return None def store_refresh_token(user_id, refresh_id, token_hash): conn = get_pg_conn() if not conn: return try: cur = conn.cursor() cur.execute( """INSERT INTO refresh_tokens (id, user_id, token_hash, token_prefix, expires_at) VALUES (%s, %s, %s, %s, NOW() + INTERVAL '30 days')""", (refresh_id, user_id, token_hash, token_hash[:8]), ) cur.close() conn.close() except Exception as e: logger.error(f"Failed to store refresh token: {e}") def log_audit(user_id, action, token_prefix, details=None): conn = get_pg_conn() if not conn: return try: cur = conn.cursor() cur.execute( """INSERT INTO token_audit_log (user_id, action, token_prefix, ip_address, user_agent, details) VALUES (%s, %s, %s, %s, %s, %s)""", ( user_id, action, token_prefix, request.remote_addr if request else None, request.user_agent.string if request and request.user_agent else None, "{}" if details is None else details, ), ) cur.close() conn.close() except Exception: pass def register_routes(app): @app.route("/health", methods=["GET"]) def healthcheck(): conn = get_pg_conn() db_ok = conn is not None if conn: conn.close() return jsonify({ "status": "ok" if db_ok else "degraded", "service": "token-broker", "version": "1.0.0", "database": "connected" if db_ok else "disconnected", "timestamp": datetime.now(timezone.utc).isoformat(), }), 200 if db_ok else 503 @app.route("/api/v1/tokens", methods=["POST"]) @token_required def issue_token(): data = request.get_json(silent=True) or {} user_id = g.user_id scopes = data.get("scopes", []) name = data.get("name", "default") metadata = data.get("metadata", {}) conn = get_pg_conn() if not conn: return jsonify({"error": "db_error", "message": "Database unavailable"}), 503 try: cur = conn.cursor() import psycopg2.extras raw_token = "tb_" + secrets.token_urlsafe(32) token_hash = hashlib.sha256(raw_token.encode()).hexdigest() token_prefix = raw_token[:12] + "..." cur.execute( """INSERT INTO api_tokens (user_id, name, token_hash, token_prefix, scopes, metadata) VALUES (%s, %s, %s, %s, %s, %s) RETURNING id, created_at, expires_at""", (user_id, name, token_hash, token_prefix, scopes, psycopg2.extras.Json(metadata)), ) row = cur.fetchone() cur.close() conn.close() log_audit(user_id, "api_token_created", token_prefix) return jsonify({ "id": str(row[0]), "token": raw_token, "name": name, "scopes": scopes, "created_at": row[1].isoformat(), "expires_at": row[2].isoformat() if row[2] else None, }), 201 except Exception as e: logger.error(f"Token creation failed: {e}") return jsonify({"error": "creation_failed", "message": str(e)}), 500 @app.route("/api/v1/tokens/verify", methods=["POST"]) def verify_token(): data = request.get_json(silent=True) or {} raw_token = data.get("token", "") if not raw_token: return jsonify({"valid": False, "error": "token_required"}), 400 token_hash = hashlib.sha256(raw_token.encode()).hexdigest() conn = get_pg_conn() if not conn: return jsonify({"valid": False, "error": "db_error"}), 503 try: cur = conn.cursor() cur.execute( """SELECT id, user_id, name, scopes, is_active, created_at, expires_at, last_used_at FROM api_tokens WHERE token_hash = %s""", (token_hash,), ) row = cur.fetchone() if not row: cur.close() conn.close() return jsonify({"valid": False, "error": "token_not_found"}), 404 token_id, user_id, name, scopes, is_active, created_at, expires_at, last_used_at = row if not is_active: cur.close() conn.close() return jsonify({"valid": False, "error": "token_revoked"}), 403 if expires_at and expires_at < datetime.now(timezone.utc): cur.close() conn.close() return jsonify({"valid": False, "error": "token_expired"}), 403 cur.execute( "UPDATE api_tokens SET last_used_at = NOW() WHERE id = %s", (token_id,), ) cur.close() conn.close() return jsonify({ "valid": True, "token_id": str(token_id), "user_id": user_id, "name": name, "scopes": scopes, }) except Exception as e: logger.error(f"Token verification failed: {e}") return jsonify({"valid": False, "error": "verification_failed"}), 500 @app.route("/api/v1/tokens/", methods=["GET"]) @token_required def get_token(token_id): conn = get_pg_conn() if not conn: return jsonify({"error": "db_error"}), 503 try: cur = conn.cursor() cur.execute( """SELECT id, user_id, name, scopes, is_active, created_at, expires_at, last_used_at, metadata FROM api_tokens WHERE id = %s AND user_id = %s""", (token_id, g.user_id), ) row = cur.fetchone() cur.close() conn.close() if not row: return jsonify({"error": "not_found"}), 404 return jsonify({ "id": str(row[0]), "user_id": row[1], "name": row[2], "scopes": row[3], "is_active": row[4], "created_at": row[5].isoformat(), "expires_at": row[6].isoformat() if row[6] else None, "last_used_at": row[7].isoformat() if row[7] else None, "metadata": row[8] if row[8] else {}, }) except Exception as e: logger.error(f"Get token failed: {e}") return jsonify({"error": "query_failed"}), 500 @app.route("/api/v1/tokens/revoke/", methods=["POST"]) @token_required def revoke_token(token_id): conn = get_pg_conn() if not conn: return jsonify({"error": "db_error"}), 503 try: cur = conn.cursor() cur.execute( """UPDATE api_tokens SET is_active = FALSE WHERE id = %s AND user_id = %s RETURNING id, name""", (token_id, g.user_id), ) row = cur.fetchone() cur.close() conn.close() if not row: return jsonify({"error": "not_found"}), 404 log_audit(g.user_id, "api_token_revoked", str(row[0])[:8]) return jsonify({"status": "revoked", "token_id": str(row[0])}) except Exception as e: logger.error(f"Revoke token failed: {e}") return jsonify({"error": "revoke_failed"}), 500 @app.route("/api/v1/tokens/user/", methods=["GET"]) @token_required def list_user_tokens(user_id): if g.user_id != user_id and "admin" not in g.scopes: return jsonify({"error": "forbidden"}), 403 conn = get_pg_conn() if not conn: return jsonify({"error": "db_error"}), 503 try: cur = conn.cursor() cur.execute( """SELECT id, user_id, name, scopes, is_active, created_at, expires_at, last_used_at FROM api_tokens WHERE user_id = %s ORDER BY created_at DESC""", (user_id,), ) rows = cur.fetchall() cur.close() conn.close() tokens = [] for row in rows: tokens.append({ "id": str(row[0]), "user_id": row[1], "name": row[2], "scopes": row[3], "is_active": row[4], "created_at": row[5].isoformat(), "expires_at": row[6].isoformat() if row[6] else None, "last_used_at": row[7].isoformat() if row[7] else None, }) return jsonify({"tokens": tokens, "total": len(tokens)}) except Exception as e: logger.error(f"List tokens failed: {e}") return jsonify({"error": "query_failed"}), 500 @app.route("/api/v1/auth/token", methods=["POST"]) def exchange_token(): data = request.get_json(silent=True) or {} grant_type = data.get("grant_type", "client_credentials") raw_token = data.get("client_token", "") or data.get("token", "") refresh_raw = data.get("refresh_token", "") if grant_type == "refresh_token" and refresh_raw: return refresh_access_token(refresh_raw) if not raw_token: return jsonify({"error": "invalid_request", "message": "client_token required"}), 400 token_hash = hashlib.sha256(raw_token.encode()).hexdigest() conn = get_pg_conn() if not conn: return jsonify({"error": "db_error"}), 503 try: cur = conn.cursor() cur.execute( """SELECT id, user_id, scopes, is_active, expires_at FROM api_tokens WHERE token_hash = %s""", (token_hash,), ) row = cur.fetchone() cur.close() conn.close() if not row: return jsonify({"error": "invalid_token"}), 401 if not row[3]: return jsonify({"error": "token_revoked"}), 403 if row[4] and row[4] < datetime.now(timezone.utc): return jsonify({"error": "token_expired"}), 403 token_pair = generate_token_pair(row[1], row[2]) return jsonify(token_pair), 200 except Exception as e: logger.error(f"Token exchange failed: {e}") return jsonify({"error": "exchange_failed"}), 500 @app.route("/api/v1/auth/refresh", methods=["POST"]) def refresh_token_endpoint(): data = request.get_json(silent=True) or {} refresh_raw = data.get("refresh_token", "") return refresh_access_token(refresh_raw) @app.route("/api/v1/auth/revoke", methods=["POST"]) @token_required def revoke_refresh_token(): data = request.get_json(silent=True) or {} refresh_raw = data.get("refresh_token", "") if not refresh_raw: return jsonify({"error": "refresh_token_required"}), 400 token_hash = hashlib.sha256(refresh_raw.encode()).hexdigest() conn = get_pg_conn() if not conn: return jsonify({"error": "db_error"}), 503 try: cur = conn.cursor() cur.execute( "UPDATE refresh_tokens SET revoked = TRUE, revoked_at = NOW() WHERE token_hash = %s", (token_hash,), ) cur.close() conn.close() log_audit(g.user_id, "refresh_token_revoked", token_hash[:8]) return jsonify({"status": "revoked"}) except Exception as e: logger.error(f"Revoke refresh token failed: {e}") return jsonify({"error": "revoke_failed"}), 500 def refresh_access_token(refresh_raw): if not refresh_raw: return jsonify({"error": "refresh_token_required"}), 400 token_hash = hashlib.sha256(refresh_raw.encode()).hexdigest() conn = get_pg_conn() if not conn: return jsonify({"error": "db_error"}), 503 try: cur = conn.cursor() cur.execute( """SELECT id, user_id, revoked, expires_at FROM refresh_tokens WHERE token_hash = %s""", (token_hash,), ) row = cur.fetchone() if not row: cur.close() conn.close() return jsonify({"error": "invalid_token"}), 401 if row[2]: cur.close() conn.close() return jsonify({"error": "token_revoked"}), 403 if row[3] < datetime.now(timezone.utc): cur.close() conn.close() return jsonify({"error": "token_expired"}), 403 refresh_id = row[0] user_id = row[1] cur.execute( "UPDATE refresh_tokens SET revoked = TRUE, revoked_at = NOW() WHERE id = %s", (refresh_id,), ) pairs = generate_token_pair(user_id) cur.close() conn.close() return jsonify(pairs), 200 except Exception as e: logger.error(f"Refresh token failed: {e}") return jsonify({"error": "refresh_failed"}), 500 def register_error_handlers(app): @app.errorhandler(404) def not_found(e): return jsonify({"error": "not_found", "message": "Route not found"}), 404 @app.errorhandler(405) def method_not_allowed(e): return jsonify({"error": "method_not_allowed", "message": "Method not allowed"}), 405 @app.errorhandler(500) def internal_error(e): logger.error(f"Internal error: {e}") return jsonify({"error": "internal_error", "message": "Internal server error"}), 500 if __name__ == "__main__": logger.info("=" * 60) logger.info("Token Broker API starting...") logger.info(f"DB: {DB_HOST}:{DB_PORT}/{DB_NAME}") logger.info(f"Port: {os.environ.get('TOKEN_BROKER_PORT', '8783')}") logger.info("=" * 60) init_db() port = int(os.environ.get("TOKEN_BROKER_PORT", "8783")) debug = os.environ.get("FLASK_ENV", "production") == "development" app = create_app() app.run(host="0.0.0.0", port=port, debug=debug)