Files
turf_saas/ai_router/providers/anthropic_adapter.py
CTO H3R7Tech 4b766cb908 feat(HRT-200): AI Router — Multi-provider LLM routing with failover
- 4 provider adapters: OpenAI (SDK), Anthropic (SDK), Google (google-genai), Mistral (direct HTTP)
- Core router with automatic failover + exponential backoff
- Flask blueprint with /api/v1/ai/* endpoints
- Auth via token-broker verify endpoint
- DB models for ai_providers, ai_model_mapping, ai_router_log
- /health endpoint (parallel provider check), /usage stats
- 21 unit tests (all passing)
2026-05-24 10:21:36 +02:00

58 lines
2.0 KiB
Python

import logging
from typing import Optional
from .base import AIProvider
logger = logging.getLogger("ai_router.anthropic")
class AnthropicAdapter(AIProvider):
@property
def name(self) -> str:
return "anthropic"
def chat(self, messages: list, model: str, api_key: Optional[str] = None, **kwargs) -> dict:
from anthropic import Anthropic
key = api_key or self.get_api_key()
client = Anthropic(api_key=key)
system_msg = None
chat_messages = messages
if messages and messages[0].get("role") == "system":
system_msg = messages[0]["content"]
chat_messages = messages[1:]
resp = client.messages.create(
model=model,
system=system_msg,
messages=[{"role": m["role"], "content": m["content"]} for m in chat_messages],
**{k: v for k, v in kwargs.items() if k in ("temperature", "max_tokens", "top_p")},
)
return {
"content": resp.content[0].text if resp.content else "",
"model": resp.model,
"provider": self.name,
"usage": {
"prompt_tokens": resp.usage.input_tokens if resp.usage else 0,
"completion_tokens": resp.usage.output_tokens if resp.usage else 0,
"total_tokens": (resp.usage.input_tokens + resp.usage.output_tokens) if resp.usage else 0,
},
}
def models(self) -> list:
from anthropic import Anthropic
client = Anthropic(api_key=self.get_api_key())
return [m.id for m in client.models.list()]
def check_health(self) -> dict:
try:
from anthropic import Anthropic
client = Anthropic(api_key=self.get_api_key())
client.models.list()
return {"status": "ok", "details": "API reachable"}
except Exception as e:
logger.warning(f"Anthropic health check failed: {e}")
return {"status": "error", "details": str(e)}