Views: 1
The AIVerse extension plan is a technical roadmap designed to transform the platform into a provider-agnostic AI gateway. By implementing a modular backend with FastAPI and a strictly typed TypeScript frontend, AIVerse enables real-time switching between cloud providers (such as Groq, Together AI, and Anthropic) and local inference engines like Ollama. Key features of this integration include a unified PostgreSQL database for conversation persistence, Alembic-managed migrations for usage tracking, and a Docker Compose orchestration strategy to ensure consistent performance across environments like Oracle Cloud and Hostinger VPS. This approach prioritizes developer flexibility, cost-efficiency, and high-performance AI interactions.
📋 Overview of Extensions
Let’s extend AIVerse to support multiple AI providers with a unified interface. This will give users flexibility to choose between:
- Groq API – Ultra-fast inference with developer keys
- Ollama – Local models (already implemented)
- OpenAI – GPT models via API keys
- Anthropic – Claude models via API keys
- Together AI – Open models without limits
- Local Models – Unlimited usage via Ollama
🎯 Architecture Design
Unified AI Provider Interface
┌─────────────────────────────────────────────────┐
│ FastAPI Application Layer │
│ │
│ ┌───────────────────────────────────────────┐ │
│ │ AI Provider Manager (Factory) │ │
│ │ - Route requests to appropriate provider │ │
│ │ - Handle API key management │ │
│ │ - Track usage and quotas │ │
│ └────────┬──────────────────────────────────┘ │
│ │ │
│ ┌────────▼──────────────────────────────────┐ │
│ │ Abstract AI Provider Interface │ │
│ │ (Base class for all providers) │ │
│ └────┬────┬────┬────┬────┬──────────────────┘ │
│ │ │ │ │ │ │
├───────┼────┼────┼────┼────┼─────────────────────┤
│ │ │ │ │ │ │
│ ┌────▼┐ ┌▼───┐┌▼──┐┌▼──┐┌▼─────────┐ │
│ │Groq │ │OpenAI│Claude│Together│Ollama│ │
│ │API │ │ API││API││ AI ││Local │ │
│ └─────┘ └────┘└───┘└───┘└──────────┘ │
└─────────────────────────────────────────────────┘
🛠️ Implementation Plan
Phase 1: Core Infrastructure ✅
Step 1: Create Abstract Provider Interface
Create app/services/ai/base_provider.py:
"""
Abstract base class for AI providers
All AI providers must implement this interface
"""
from abc import ABC, abstractmethod
from typing import AsyncGenerator, Dict, Any, List
from pydantic import BaseModel
class ChatMessage(BaseModel):
"""Standard chat message format"""
role: str # "user", "assistant", "system"
content: str
class ChatResponse(BaseModel):
"""Standard chat response format"""
content: str
model: str
provider: str
tokens_used: int = 0
cost: float = 0.0
class ModelInfo(BaseModel):
"""Model information"""
id: str
name: str
provider: str
context_length: int
cost_per_1k_tokens: float = 0.0
supports_streaming: bool = True
class BaseAIProvider(ABC):
"""
Abstract base class for AI providers
All AI providers (Groq, OpenAI, Claude, etc.) must inherit this
"""
def __init__(self, api_key: str = None, **kwargs):
self.api_key = api_key
self.provider_name = "base"
@abstractmethod
async def chat(
self,
messages: List[ChatMessage],
model: str,
temperature: float = 0.7,
max_tokens: int = 1000,
**kwargs
) -> ChatResponse:
"""
Send a chat request and get response
Args:
messages: List of chat messages
model: Model identifier
temperature: Sampling temperature
max_tokens: Maximum tokens to generate
Returns:
ChatResponse object
"""
pass
@abstractmethod
async def stream_chat(
self,
messages: List[ChatMessage],
model: str,
temperature: float = 0.7,
max_tokens: int = 1000,
**kwargs
) -> AsyncGenerator[str, None]:
"""
Stream chat response
Args:
messages: List of chat messages
model: Model identifier
temperature: Sampling temperature
max_tokens: Maximum tokens to generate
Yields:
Chunks of response text
"""
pass
@abstractmethod
async def list_models(self) -> List[ModelInfo]:
"""
List available models
Returns:
List of ModelInfo objects
"""
pass
@abstractmethod
async def get_model_info(self, model_id: str) -> ModelInfo:
"""
Get information about a specific model
Args:
model_id: Model identifier
Returns:
ModelInfo object
"""
pass
def calculate_cost(self, tokens: int, cost_per_1k: float) -> float:
"""
Calculate cost for token usage
Args:
tokens: Number of tokens used
cost_per_1k: Cost per 1000 tokens
Returns:
Total cost in dollars
"""
return (tokens / 1000) * cost_per_1k
Step 2: Create Provider Manager
Create app/services/ai/provider_manager.py:
"""
AI Provider Manager
Factory pattern for managing multiple AI providers
"""
from typing import Dict, Type, Optional
from app.services.ai.base_provider import BaseAIProvider
from app.core.exceptions import AppException
from app.utils.logger import logger
class ProviderManager:
"""
Manages multiple AI providers
Singleton pattern to ensure one instance across app
"""
_instance = None
_providers: Dict[str, Type[BaseAIProvider]] = {}
_initialized_providers: Dict[str, BaseAIProvider] = {}
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
@classmethod
def register_provider(cls, name: str, provider_class: Type[BaseAIProvider]):
"""
Register a new AI provider
Args:
name: Provider name (e.g., "groq", "openai")
provider_class: Provider class
"""
cls._providers[name] = provider_class
logger.info(f"Registered AI provider: {name}")
@classmethod
def get_provider(cls, name: str, api_key: str = None, **kwargs) -> BaseAIProvider:
"""
Get or create provider instance
Args:
name: Provider name
api_key: API key for provider
**kwargs: Additional provider configuration
Returns:
Provider instance
Raises:
AppException: If provider not found
"""
# Check if provider is registered
if name not in cls._providers:
raise AppException(
status_code=400,
detail=f"AI provider '{name}' not found. Available: {list(cls._providers.keys())}"
)
# Create cache key
cache_key = f"{name}:{api_key or 'default'}"
# Return cached instance if exists
if cache_key in cls._initialized_providers:
return cls._initialized_providers[cache_key]
# Create new instance
provider_class = cls._providers[name]
provider = provider_class(api_key=api_key, **kwargs)
# Cache instance
cls._initialized_providers[cache_key] = provider
logger.info(f"Initialized AI provider: {name}")
return provider
@classmethod
def list_providers(cls) -> list:
"""
List all registered providers
Returns:
List of provider names
"""
return list(cls._providers.keys())
Step 3: Refactor Existing Ollama Provider
Create app/services/ai/ollama_provider.py:
"""
Ollama AI Provider
Local LLM provider using Ollama
"""
from typing import List, AsyncGenerator
import httpx
from app.services.ai.base_provider import (
BaseAIProvider,
ChatMessage,
ChatResponse,
ModelInfo
)
from app.core.config import settings
from app.utils.logger import logger
class OllamaProvider(BaseAIProvider):
"""
Ollama provider for local LLMs
Supports models like LLaMA 2, Mistral, etc.
"""
def __init__(self, api_key: str = None, base_url: str = None, **kwargs):
super().__init__(api_key, **kwargs)
self.provider_name = "ollama"
self.base_url = base_url or settings.OLLAMA_BASE_URL
self.client = httpx.AsyncClient(timeout=300.0)
async def chat(
self,
messages: List[ChatMessage],
model: str,
temperature: float = 0.7,
max_tokens: int = 1000,
**kwargs
) -> ChatResponse:
"""Send chat request to Ollama"""
# Convert messages to Ollama format
ollama_messages = [
{"role": msg.role, "content": msg.content}
for msg in messages
]
# Make request
response = await self.client.post(
f"{self.base_url}/api/chat",
json={
"model": model,
"messages": ollama_messages,
"stream": False,
"options": {
"temperature": temperature,
"num_predict": max_tokens
}
}
)
response.raise_for_status()
data = response.json()
return ChatResponse(
content=data.get("message", {}).get("content", ""),
model=model,
provider=self.provider_name,
tokens_used=data.get("eval_count", 0),
cost=0.0 # Local is free!
)
async def stream_chat(
self,
messages: List[ChatMessage],
model: str,
temperature: float = 0.7,
max_tokens: int = 1000,
**kwargs
) -> AsyncGenerator[str, None]:
"""Stream chat response from Ollama"""
ollama_messages = [
{"role": msg.role, "content": msg.content}
for msg in messages
]
async with self.client.stream(
"POST",
f"{self.base_url}/api/chat",
json={
"model": model,
"messages": ollama_messages,
"stream": True,
"options": {
"temperature": temperature,
"num_predict": max_tokens
}
}
) as response:
response.raise_for_status()
async for line in response.aiter_lines():
if line:
import json
data = json.loads(line)
if "message" in data:
content = data["message"].get("content", "")
if content:
yield content
async def list_models(self) -> List[ModelInfo]:
"""List available Ollama models"""
response = await self.client.get(f"{self.base_url}/api/tags")
response.raise_for_status()
data = response.json()
models = []
for model in data.get("models", []):
models.append(ModelInfo(
id=model["name"],
name=model["name"],
provider=self.provider_name,
context_length=4096, # Default, can be configured
cost_per_1k_tokens=0.0,
supports_streaming=True
))
return models
async def get_model_info(self, model_id: str) -> ModelInfo:
"""Get info about specific Ollama model"""
response = await self.client.post(
f"{self.base_url}/api/show",
json={"name": model_id}
)
response.raise_for_status()
data = response.json()
return ModelInfo(
id=model_id,
name=model_id,
provider=self.provider_name,
context_length=data.get("context_length", 4096),
cost_per_1k_tokens=0.0,
supports_streaming=True
)
Phase 2: Groq Integration 🚀
Step 4: Create Groq Provider
Create app/services/ai/groq_provider.py:
"""
Groq AI Provider
Ultra-fast inference with Groq API
"""
from typing import List, AsyncGenerator
import httpx
import json
from app.services.ai.base_provider import (
BaseAIProvider,
ChatMessage,
ChatResponse,
ModelInfo
)
from app.utils.logger import logger
class GroqProvider(BaseAIProvider):
"""
Groq provider for fast inference
Supports models: llama-3.3-70b, mixtral-8x7b, gemma-7b
"""
# Groq pricing (as of 2024)
PRICING = {
"llama-3.3-70b-versatile": {"input": 0.00059, "output": 0.00079},
"llama-3.1-70b-versatile": {"input": 0.00059, "output": 0.00079},
"llama-3.1-8b-instant": {"input": 0.00005, "output": 0.00008},
"mixtral-8x7b-32768": {"input": 0.00024, "output": 0.00024},
"gemma-7b-it": {"input": 0.00007, "output": 0.00007},
}
def __init__(self, api_key: str, **kwargs):
super().__init__(api_key, **kwargs)
self.provider_name = "groq"
self.base_url = "https://api.groq.com/openai/v1"
if not api_key:
raise ValueError("Groq API key is required")
self.client = httpx.AsyncClient(
timeout=60.0,
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
)
async def chat(
self,
messages: List[ChatMessage],
model: str,
temperature: float = 0.7,
max_tokens: int = 1000,
**kwargs
) -> ChatResponse:
"""Send chat request to Groq"""
# Convert messages to OpenAI format
groq_messages = [
{"role": msg.role, "content": msg.content}
for msg in messages
]
# Make request
response = await self.client.post(
f"{self.base_url}/chat/completions",
json={
"model": model,
"messages": groq_messages,
"temperature": temperature,
"max_tokens": max_tokens,
**kwargs
}
)
response.raise_for_status()
data = response.json()
# Extract response
choice = data["choices"][0]
content = choice["message"]["content"]
# Calculate tokens and cost
usage = data.get("usage", {})
total_tokens = usage.get("total_tokens", 0)
pricing = self.PRICING.get(model, {"input": 0, "output": 0})
input_cost = self.calculate_cost(
usage.get("prompt_tokens", 0),
pricing["input"]
)
output_cost = self.calculate_cost(
usage.get("completion_tokens", 0),
pricing["output"]
)
total_cost = input_cost + output_cost
return ChatResponse(
content=content,
model=model,
provider=self.provider_name,
tokens_used=total_tokens,
cost=total_cost
)
async def stream_chat(
self,
messages: List[ChatMessage],
model: str,
temperature: float = 0.7,
max_tokens: int = 1000,
**kwargs
) -> AsyncGenerator[str, None]:
"""Stream chat response from Groq"""
groq_messages = [
{"role": msg.role, "content": msg.content}
for msg in messages
]
async with self.client.stream(
"POST",
f"{self.base_url}/chat/completions",
json={
"model": model,
"messages": groq_messages,
"temperature": temperature,
"max_tokens": max_tokens,
"stream": True,
**kwargs
}
) as response:
response.raise_for_status()
async for line in response.aiter_lines():
if line.startswith("data: "):
data_str = line[6:] # Remove "data: " prefix
if data_str == "[DONE]":
break
try:
data = json.loads(data_str)
delta = data["choices"][0]["delta"]
if "content" in delta:
yield delta["content"]
except json.JSONDecodeError:
continue
async def list_models(self) -> List[ModelInfo]:
"""List available Groq models"""
response = await self.client.get(f"{self.base_url}/models")
response.raise_for_status()
data = response.json()
models = []
for model in data.get("data", []):
model_id = model["id"]
pricing = self.PRICING.get(model_id, {"input": 0, "output": 0})
models.append(ModelInfo(
id=model_id,
name=model.get("name", model_id),
provider=self.provider_name,
context_length=model.get("context_window", 32768),
cost_per_1k_tokens=pricing["input"], # Use input pricing
supports_streaming=True
))
return models
async def get_model_info(self, model_id: str) -> ModelInfo:
"""Get info about specific Groq model"""
response = await self.client.get(f"{self.base_url}/models/{model_id}")
response.raise_for_status()
data = response.json()
pricing = self.PRICING.get(model_id, {"input": 0, "output": 0})
return ModelInfo(
id=model_id,
name=data.get("name", model_id),
provider=self.provider_name,
context_length=data.get("context_window", 32768),
cost_per_1k_tokens=pricing["input"],
supports_streaming=True
)
Phase 3: OpenAI Integration 🤖
Step 5: Create OpenAI Provider
Create app/services/ai/openai_provider.py:
"""
OpenAI AI Provider
Official OpenAI API integration (GPT models)
"""
from typing import List, AsyncGenerator
import httpx
import json
from app.services.ai.base_provider import (
BaseAIProvider,
ChatMessage,
ChatResponse,
ModelInfo
)
from app.utils.logger import logger
class OpenAIProvider(BaseAIProvider):
"""
OpenAI provider for GPT models
Supports: GPT-4, GPT-4 Turbo, GPT-3.5 Turbo
"""
# OpenAI pricing (as of 2024)
PRICING = {
"gpt-4-turbo": {"input": 0.01, "output": 0.03},
"gpt-4": {"input": 0.03, "output": 0.06},
"gpt-4-32k": {"input": 0.06, "output": 0.12},
"gpt-3.5-turbo": {"input": 0.0005, "output": 0.0015},
"gpt-3.5-turbo-16k": {"input": 0.003, "output": 0.004},
"gpt-4o": {"input": 0.005, "output": 0.015},
"gpt-4o-mini": {"input": 0.00015, "output": 0.0006},
}
def __init__(self, api_key: str, **kwargs):
super().__init__(api_key, **kwargs)
self.provider_name = "openai"
self.base_url = "https://api.openai.com/v1"
if not api_key:
raise ValueError("OpenAI API key is required")
self.client = httpx.AsyncClient(
timeout=120.0,
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
)
async def chat(
self,
messages: List[ChatMessage],
model: str,
temperature: float = 0.7,
max_tokens: int = 1000,
**kwargs
) -> ChatResponse:
"""Send chat request to OpenAI"""
# Convert messages
openai_messages = [
{"role": msg.role, "content": msg.content}
for msg in messages
]
# Make request
response = await self.client.post(
f"{self.base_url}/chat/completions",
json={
"model": model,
"messages": openai_messages,
"temperature": temperature,
"max_tokens": max_tokens,
**kwargs
}
)
response.raise_for_status()
data = response.json()
# Extract response
choice = data["choices"][0]
content = choice["message"]["content"]
# Calculate cost
usage = data.get("usage", {})
total_tokens = usage.get("total_tokens", 0)
pricing = self.PRICING.get(model, {"input": 0, "output": 0})
input_cost = self.calculate_cost(
usage.get("prompt_tokens", 0),
pricing["input"]
)
output_cost = self.calculate_cost(
usage.get("completion_tokens", 0),
pricing["output"]
)
total_cost = input_cost + output_cost
return ChatResponse(
content=content,
model=model,
provider=self.provider_name,
tokens_used=total_tokens,
cost=total_cost
)
async def stream_chat(
self,
messages: List[ChatMessage],
model: str,
temperature: float = 0.7,
max_tokens: int = 1000,
**kwargs
) -> AsyncGenerator[str, None]:
"""Stream chat response from OpenAI"""
openai_messages = [
{"role": msg.role, "content": msg.content}
for msg in messages
]
async with self.client.stream(
"POST",
f"{self.base_url}/chat/completions",
json={
"model": model,
"messages": openai_messages,
"temperature": temperature,
"max_tokens": max_tokens,
"stream": True,
**kwargs
}
) as response:
response.raise_for_status()
async for line in response.aiter_lines():
if line.startswith("data: "):
data_str = line[6:]
if data_str == "[DONE]":
break
try:
data = json.loads(data_str)
delta = data["choices"][0]["delta"]
if "content" in delta:
yield delta["content"]
except json.JSONDecodeError:
continue
async def list_models(self) -> List[ModelInfo]:
"""List available OpenAI models"""
response = await self.client.get(f"{self.base_url}/models")
response.raise_for_status()
data = response.json()
models = []
for model in data.get("data", []):
model_id = model["id"]
# Only include chat models
if not any(x in model_id for x in ["gpt-3.5", "gpt-4"]):
continue
pricing = self.PRICING.get(model_id, {"input": 0, "output": 0})
models.append(ModelInfo(
id=model_id,
name=model_id,
provider=self.provider_name,
context_length=self._get_context_length(model_id),
cost_per_1k_tokens=pricing["input"],
supports_streaming=True
))
return models
async def get_model_info(self, model_id: str) -> ModelInfo:
"""Get info about specific OpenAI model"""
pricing = self.PRICING.get(model_id, {"input": 0, "output": 0})
return ModelInfo(
id=model_id,
name=model_id,
provider=self.provider_name,
context_length=self._get_context_length(model_id),
cost_per_1k_tokens=pricing["input"],
supports_streaming=True
)
def _get_context_length(self, model_id: str) -> int:
"""Get context length for model"""
if "32k" in model_id:
return 32768
elif "16k" in model_id:
return 16384
elif "gpt-4" in model_id:
return 8192
elif "gpt-3.5" in model_id:
return 4096
return 4096
Phase 4: Anthropic Claude Integration 🧠
Step 6: Create Anthropic Claude Provider
Create app/services/ai/anthropic_provider.py:
"""
Anthropic AI Provider
Claude models via Anthropic API
"""
from typing import List, AsyncGenerator
import httpx
import json
from app.services.ai.base_provider import (
BaseAIProvider,
ChatMessage,
ChatResponse,
ModelInfo
)
from app.utils.logger import logger
class AnthropicProvider(BaseAIProvider):
"""
Anthropic provider for Claude models
Supports: Claude 3 Opus, Claude 3 Sonnet, Claude 3 Haiku
"""
# Anthropic pricing (as of 2024)
PRICING = {
"claude-3-opus-20240229": {"input": 0.015, "output": 0.075},
"claude-3-sonnet-20240229": {"input": 0.003, "output": 0.015},
"claude-3-haiku-20240307": {"input": 0.00025, "output": 0.00125},
"claude-3-5-sonnet-20241022": {"input": 0.003, "output": 0.015},
}
def __init__(self, api_key: str, **kwargs):
super().__init__(api_key, **kwargs)
self.provider_name = "anthropic"
self.base_url = "https://api.anthropic.com/v1"
self.api_version = "2023-06-01"
if not api_key:
raise ValueError("Anthropic API key is required")
self.client = httpx.AsyncClient(
timeout=120.0,
headers={
"x-api-key": api_key,
"anthropic-version": self.api_version,
"Content-Type": "application/json"
}
)
async def chat(
self,
messages: List[ChatMessage],
model: str,
temperature: float = 0.7,
max_tokens: int = 1000,
**kwargs
) -> ChatResponse:
"""Send chat request to Anthropic"""
# Convert messages to Anthropic format
# Anthropic uses "user" and "assistant" roles only
# System messages go in separate "system" field
system_message = None
anthropic_messages = []
for msg in messages:
if msg.role == "system":
system_message = msg.content
else:
anthropic_messages.append({
"role": msg.role,
"content": msg.content
})
# Build request payload
payload = {
"model": model,
"messages": anthropic_messages,
"max_tokens": max_tokens,
"temperature": temperature,
**kwargs
}
if system_message:
payload["system"] = system_message
# Make request
response = await self.client.post(
f"{self.base_url}/messages",
json=payload
)
response.raise_for_status()
data = response.json()
# Extract response
content = data["content"][0]["text"]
# Calculate cost
usage = data.get("usage", {})
input_tokens = usage.get("input_tokens", 0)
output_tokens = usage.get("output_tokens", 0)
total_tokens = input_tokens + output_tokens
pricing = self.PRICING.get(model, {"input": 0, "output": 0})
input_cost = self.calculate_cost(input_tokens, pricing["input"])
output_cost = self.calculate_cost(output_tokens, pricing["output"])
total_cost = input_cost + output_cost
return ChatResponse(
content=content,
model=model,
provider=self.provider_name,
tokens_used=total_tokens,
cost=total_cost
)
async def stream_chat(
self,
messages: List[ChatMessage],
model: str,
temperature: float = 0.7,
max_tokens: int = 1000,
**kwargs
) -> AsyncGenerator[str, None]:
"""Stream chat response from Anthropic"""
# Convert messages
system_message = None
anthropic_messages = []
for msg in messages:
if msg.role == "system":
system_message = msg.content
else:
anthropic_messages.append({
"role": msg.role,
"content": msg.content
})
payload = {
"model": model,
"messages": anthropic_messages,
"max_tokens": max_tokens,
"temperature": temperature,
"stream": True,
**kwargs
}
if system_message:
payload["system"] = system_message
async with self.client.stream(
"POST",
f"{self.base_url}/messages",
json=payload
) as response:
response.raise_for_status()
async for line in response.aiter_lines():
if line.startswith("data: "):
data_str = line[6:]
try:
data = json.loads(data_str)
# Handle different event types
if data.get("type") == "content_block_delta":
delta = data.get("delta", {})
if delta.get("type") == "text_delta":
text = delta.get("text", "")
if text:
yield text
except json.JSONDecodeError:
continue
async def list_models(self) -> List[ModelInfo]:
"""List available Anthropic models"""
# Anthropic doesn't have a models endpoint, return hardcoded list
models = []
for model_id, pricing in self.PRICING.items():
models.append(ModelInfo(
id=model_id,
name=self._get_model_name(model_id),
provider=self.provider_name,
context_length=200000, # Claude 3 has 200k context
cost_per_1k_tokens=pricing["input"],
supports_streaming=True
))
return models
async def get_model_info(self, model_id: str) -> ModelInfo:
"""Get info about specific Anthropic model"""
pricing = self.PRICING.get(model_id, {"input": 0, "output": 0})
return ModelInfo(
id=model_id,
name=self._get_model_name(model_id),
provider=self.provider_name,
context_length=200000,
cost_per_1k_tokens=pricing["input"],
supports_streaming=True
)
def _get_model_name(self, model_id: str) -> str:
"""Get friendly model name"""
if "opus" in model_id:
return "Claude 3 Opus"
elif "sonnet" in model_id:
if "3-5" in model_id:
return "Claude 3.5 Sonnet"
return "Claude 3 Sonnet"
elif "haiku" in model_id:
return "Claude 3 Haiku"
return model_id
Phase 5: Together AI Integration 🌐
Step 7: Create Together AI Provider
Create app/services/ai/together_provider.py:
"""
Together AI Provider
Open models with generous free tier
"""
from typing import List, AsyncGenerator
import httpx
import json
from app.services.ai.base_provider import (
BaseAIProvider,
ChatMessage,
ChatResponse,
ModelInfo
)
from app.utils.logger import logger
class TogetherProvider(BaseAIProvider):
"""
Together AI provider for open models
Supports: LLaMA, Mistral, Mixtral, CodeLLaMA, etc.
Has generous free tier: $25 free credits
"""
# Together AI pricing (very competitive)
PRICING = {
"meta-llama/Llama-3-70b-chat-hf": {"input": 0.0009, "output": 0.0009},
"meta-llama/Llama-3-8b-chat-hf": {"input": 0.0002, "output": 0.0002},
"mistralai/Mixtral-8x7B-Instruct-v0.1": {"input": 0.0006, "output": 0.0006},
"mistralai/Mistral-7B-Instruct-v0.1": {"input": 0.0002, "output": 0.0002},
"codellama/CodeLlama-34b-Instruct-hf": {"input": 0.0008, "output": 0.0008},
"default": {"input": 0.0002, "output": 0.0002},
}
def __init__(self, api_key: str, **kwargs):
super().__init__(api_key, **kwargs)
self.provider_name = "together"
self.base_url = "https://api.together.xyz/v1"
if not api_key:
raise ValueError("Together AI API key is required")
self.client = httpx.AsyncClient(
timeout=120.0,
headers={
"Authorization": f"Bearer {api_key}",
"Content-Type": "application/json"
}
)
async def chat(
self,
messages: List[ChatMessage],
model: str,
temperature: float = 0.7,
max_tokens: int = 1000,
**kwargs
) -> ChatResponse:
"""Send chat request to Together AI"""
# Convert messages (OpenAI compatible)
together_messages = [
{"role": msg.role, "content": msg.content}
for msg in messages
]
# Make request
response = await self.client.post(
f"{self.base_url}/chat/completions",
json={
"model": model,
"messages": together_messages,
"temperature": temperature,
"max_tokens": max_tokens,
**kwargs
}
)
response.raise_for_status()
data = response.json()
# Extract response
choice = data["choices"][0]
content = choice["message"]["content"]
# Calculate cost
usage = data.get("usage", {})
total_tokens = usage.get("total_tokens", 0)
pricing = self.PRICING.get(model, self.PRICING["default"])
input_cost = self.calculate_cost(
usage.get("prompt_tokens", 0),
pricing["input"]
)
output_cost = self.calculate_cost(
usage.get("completion_tokens", 0),
pricing["output"]
)
total_cost = input_cost + output_cost
return ChatResponse(
content=content,
model=model,
provider=self.provider_name,
tokens_used=total_tokens,
cost=total_cost
)
async def stream_chat(
self,
messages: List[ChatMessage],
model: str,
temperature: float = 0.7,
max_tokens: int = 1000,
**kwargs
) -> AsyncGenerator[str, None]:
"""Stream chat response from Together AI"""
together_messages = [
{"role": msg.role, "content": msg.content}
for msg in messages
]
async with self.client.stream(
"POST",
f"{self.base_url}/chat/completions",
json={
"model": model,
"messages": together_messages,
"temperature": temperature,
"max_tokens": max_tokens,
"stream": True,
**kwargs
}
) as response:
response.raise_for_status()
async for line in response.aiter_lines():
if line.startswith("data: "):
data_str = line[6:]
if data_str == "[DONE]":
break
try:
data = json.loads(data_str)
delta = data["choices"][0]["delta"]
if "content" in delta:
yield delta["content"]
except json.JSONDecodeError:
continue
async def list_models(self) -> List[ModelInfo]:
"""List available Together AI models"""
response = await self.client.get(f"{self.base_url}/models")
response.raise_for_status()
data = response.json()
models = []
for model in data:
model_id = model.get("id", "")
# Filter for chat models only
if not any(x in model_id.lower() for x in ["chat", "instruct"]):
continue
pricing = self.PRICING.get(model_id, self.PRICING["default"])
models.append(ModelInfo(
id=model_id,
name=model.get("display_name", model_id),
provider=self.provider_name,
context_length=model.get("context_length", 4096),
cost_per_1k_tokens=pricing["input"],
supports_streaming=True
))
return models
async def get_model_info(self, model_id: str) -> ModelInfo:
"""Get info about specific Together AI model"""
pricing = self.PRICING.get(model_id, self.PRICING["default"])
return ModelInfo(
id=model_id,
name=model_id,
provider=self.provider_name,
context_length=4096,
cost_per_1k_tokens=pricing["input"],
supports_streaming=True
)
Phase 6: Configuration & Database Updates ⚙️
Step 8: Update Configuration
Update app/core/config.py:
# Add to Settings class after existing AI configuration
# Multi-Provider AI Configuration
GROQ_API_KEY: Optional[str] = None
OPENAI_API_KEY: Optional[str] = None
ANTHROPIC_API_KEY: Optional[str] = None
TOGETHER_API_KEY: Optional[str] = None
# Default AI provider
DEFAULT_AI_PROVIDER: str = "ollama"
# Provider-specific settings
AI_PROVIDER_TIMEOUT: int = 120 # seconds
AI_MAX_RETRIES: int = 3
# Cost tracking
ENABLE_COST_TRACKING: bool = True
MAX_MONTHLY_COST: float = 100.0 # USD
Step 9: Update User Model for API Keys
Update app/db/models/user.py:
# Add after existing columns
# AI Provider API Keys (encrypted)
groq_api_key: Mapped[Optional[str]] = mapped_column(
String(500),
nullable=True
)
openai_api_key: Mapped[Optional[str]] = mapped_column(
String(500),
nullable=True
)
anthropic_api_key: Mapped[Optional[str]] = mapped_column(
String(500),
nullable=True
)
together_api_key: Mapped[Optional[str]] = mapped_column(
String(500),
nullable=True
)
# AI Usage tracking
total_tokens_used: Mapped[int] = mapped_column(
Integer,
default=0,
nullable=False
)
total_cost: Mapped[float] = mapped_column(
Float,
default=0.0,
nullable=False
)
monthly_cost: Mapped[float] = mapped_column(
Float,
default=0.0,
nullable=False
)
last_cost_reset: Mapped[datetime] = mapped_column(
DateTime,
default=datetime.utcnow,
nullable=False
)
Create migration:
alembic revision --autogenerate -m "Add multi-provider AI support"
alembic upgrade head
Step 10: Create API Key Encryption Utility
Create app/utils/encryption.py:
"""
API Key Encryption Utility
Encrypt/decrypt sensitive API keys in database
"""
from cryptography.fernet import Fernet
from app.core.config import settings
import base64
class APIKeyEncryption:
"""Encrypt and decrypt API keys"""
def __init__(self):
# Generate key from SECRET_KEY (should be 32 url-safe base64-encoded bytes)
key = base64.urlsafe_b64encode(settings.SECRET_KEY.encode()[:32].ljust(32, b'0'))
self.cipher = Fernet(key)
def encrypt(self, api_key: str) -> str:
"""Encrypt API key"""
if not api_key:
return None
return self.cipher.encrypt(api_key.encode()).decode()
def decrypt(self, encrypted_key: str) -> str:
"""Decrypt API key"""
if not encrypted_key:
return None
return self.cipher.decrypt(encrypted_key.encode()).decode()
# Singleton instance
api_key_encryption = APIKeyEncryption()
Add to requirements.txt:
cryptography==41.0.7
Phase 7: Unified AI Service Layer 🎯
Step 11: Create Unified AI Service
Create app/services/ai/unified_service.py:
"""
Unified AI Service
Single interface for all AI providers
"""
from typing import List, AsyncGenerator, Optional
from sqlalchemy.ext.asyncio import AsyncSession
from app.services.ai.base_provider import BaseAIProvider, ChatMessage, ChatResponse, ModelInfo
from app.services.ai.provider_manager import ProviderManager
from app.db.models.user import User
from app.db.repositories.user_repository import UserRepository
from app.utils.encryption import api_key_encryption
from app.core.config import settings
from app.core.exceptions import AppException
from app.utils.logger import logger
from datetime import datetime, timedelta
class UnifiedAIService:
"""
Unified service for all AI providers
Handles provider selection, API key management, cost tracking
"""
def __init__(self, db: AsyncSession, user: User):
self.db = db
self.user = user
self.user_repo = UserRepository(db)
async def get_provider(self, provider_name: str) -> BaseAIProvider:
"""
Get AI provider instance with user's API key
Args:
provider_name: Provider name (groq, openai, anthropic, together, ollama)
Returns:
Provider instance
Raises:
AppException: If API key missing or invalid
"""
# Ollama doesn't need API key
if provider_name == "ollama":
return ProviderManager.get_provider("ollama")
# Get user's encrypted API key
api_key = await self._get_user_api_key(provider_name)
if not api_key:
raise AppException(
status_code=400,
detail=f"No API key configured for {provider_name}. Please add your API key in settings."
)
# Get provider with decrypted key
return ProviderManager.get_provider(provider_name, api_key=api_key)
async def chat(
self,
provider: str,
model: str,
messages: List[ChatMessage],
temperature: float = 0.7,
max_tokens: int = 1000,
**kwargs
) -> ChatResponse:
"""
Send chat request to specified provider
Args:
provider: Provider name
model: Model identifier
messages: Chat messages
temperature: Sampling temperature
max_tokens: Maximum tokens to generate
Returns:
ChatResponse with content and metadata
"""
# Check monthly cost limit
await self._check_cost_limit()
# Get provider
ai_provider = await self.get_provider(provider)
# Make request
response = await ai_provider.chat(
messages=messages,
model=model,
temperature=temperature,
max_tokens=max_tokens,
**kwargs
)
# Track usage
await self._track_usage(response.tokens_used, response.cost)
logger.info(
f"Chat completed: {provider}/{model}",
extra={
"user_id": self.user.id,
"provider": provider,
"model": model,
"tokens": response.tokens_used,
"cost": response.cost
}
)
return response
async def stream_chat(
self,
provider: str,
model: str,
messages: List[ChatMessage],
temperature: float = 0.7,
max_tokens: int = 1000,
**kwargs
) -> AsyncGenerator[str, None]:
"""
Stream chat response from specified provider
Args:
provider: Provider name
model: Model identifier
messages: Chat messages
temperature: Sampling temperature
max_tokens: Maximum tokens to generate
Yields:
Chunks of response text
"""
# Check monthly cost limit
await self._check_cost_limit()
# Get provider
ai_provider = await self.get_provider(provider)
# Stream response
async for chunk in ai_provider.stream_chat(
messages=messages,
model=model,
temperature=temperature,
max_tokens=max_tokens,
**kwargs
):
yield chunk
# Note: Token tracking for streaming is approximate
# Would need to count tokens from response
estimated_tokens = max_tokens // 2 # Rough estimate
estimated_cost = 0.0 # Calculate based on provider
await self._track_usage(estimated_tokens, estimated_cost)
async def list_models(self, provider: str) -> List[ModelInfo]:
"""
List available models for provider
Args:
provider: Provider name
Returns:
List of ModelInfo objects
"""
ai_provider = await self.get_provider(provider)
return await ai_provider.list_models()
async def get_model_info(self, provider: str, model_id: str) -> ModelInfo:
"""
Get information about specific model
Args:
provider: Provider name
model_id: Model identifier
Returns:
ModelInfo object
"""
ai_provider = await self.get_provider(provider)
return await ai_provider.get_model_info(model_id)
async def _get_user_api_key(self, provider: str) -> Optional[str]:
"""Get and decrypt user's API key for provider"""
# Map provider to user field
key_field_map = {
"groq": self.user.groq_api_key,
"openai": self.user.openai_api_key,
"anthropic": self.user.anthropic_api_key,
"together": self.user.together_api_key,
}
encrypted_key = key_field_map.get(provider)
if not encrypted_key:
# Check for system-level API keys
system_key_map = {
"groq": settings.GROQ_API_KEY,
"openai": settings.OPENAI_API_KEY,
"anthropic": settings.ANTHROPIC_API_KEY,
"together": settings.TOGETHER_API_KEY,
}
return system_key_map.get(provider)
# Decrypt user's key
return api_key_encryption.decrypt(encrypted_key)
async def _track_usage(self, tokens: int, cost: float):
"""Track token usage and cost for user"""
if not settings.ENABLE_COST_TRACKING:
return
# Update user totals
self.user.total_tokens_used += tokens
self.user.total_cost += cost
self.user.monthly_cost += cost
# Reset monthly cost if needed
now = datetime.utcnow()
if now - self.user.last_cost_reset > timedelta(days=30):
self.user.monthly_cost = cost
self.user.last_cost_reset = now
await self.db.commit()
await self.db.refresh(self.user)
async def _check_cost_limit(self):
"""Check if user has exceeded monthly cost limit"""
if not settings.ENABLE_COST_TRACKING:
return
# Reset monthly cost if needed
now = datetime.utcnow()
if now - self.user.last_cost_reset > timedelta(days=30):
self.user.monthly_cost = 0.0
self.user.last_cost_reset = now
await self.db.commit()
# Check limit
if self.user.monthly_cost >= settings.MAX_MONTHLY_COST:
raise AppException(
status_code=429,
detail=f"Monthly cost limit of ${settings.MAX_MONTHLY_COST} exceeded. "
f"Current usage: ${self.user.monthly_cost:.2f}"
)
Phase 8: Provider Registration 🔧
Step 12: Register All Providers
Create app/services/ai/__init__.py:
"""
AI Services Package
Register all AI providers
"""
from app.services.ai.provider_manager import ProviderManager
from app.services.ai.ollama_provider import OllamaProvider
from app.services.ai.groq_provider import GroqProvider
from app.services.ai.openai_provider import OpenAIProvider
from app.services.ai.anthropic_provider import AnthropicProvider
from app.services.ai.together_provider import TogetherProvider
# Register all providers
ProviderManager.register_provider("ollama", OllamaProvider)
ProviderManager.register_provider("groq", GroqProvider)
ProviderManager.register_provider("openai", OpenAIProvider)
ProviderManager.register_provider("anthropic", AnthropicProvider)
ProviderManager.register_provider("together", TogetherProvider)
__all__ = [
"ProviderManager",
"OllamaProvider",
"GroqProvider",
"OpenAIProvider",
"AnthropicProvider",
"TogetherProvider",
]
Phase 9: API Endpoints for Multi-Provider AI 🌐
Step 13: Create New AI Endpoints
Create app/api/v1/endpoints/ai_multi.py:
"""
Multi-Provider AI Endpoints
Unified interface for all AI providers
"""
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.responses import StreamingResponse
from sqlalchemy.ext.asyncio import AsyncSession
from typing import Annotated, List
from pydantic import BaseModel, Field
from app.db.session import get_db
from app.core.auth_dependencies import get_current_active_user
from app.db.models.user import User
from app.services.ai.unified_service import UnifiedAIService
from app.services.ai.base_provider import ChatMessage, ModelInfo
from app.services.ai.provider_manager import ProviderManager
from app.utils.logger import logger
router = APIRouter(prefix="/ai/multi", tags=["Multi-Provider AI"])
# Request/Response Models
class ChatRequest(BaseModel):
"""Chat request"""
provider: str = Field(..., description="AI provider (ollama, groq, openai, anthropic, together)")
model: str = Field(..., description="Model identifier")
messages: List[ChatMessage] = Field(..., description="Chat messages")
temperature: float = Field(0.7, ge=0.0, le=2.0, description="Sampling temperature")
max_tokens: int = Field(1000, ge=1, le=4000, description="Maximum tokens to generate")
stream: bool = Field(False, description="Stream response")
class ChatResponseModel(BaseModel):
"""Chat response"""
content: str
model: str
provider: str
tokens_used: int
cost: float
class ProviderInfo(BaseModel):
"""Provider information"""
name: str
display_name: str
requires_api_key: bool
has_api_key: bool
models_count: int
class UsageStats(BaseModel):
"""User usage statistics"""
total_tokens: int
total_cost: float
monthly_cost: float
monthly_limit: float
remaining_budget: float
# Endpoints
@router.post("/chat", response_model=ChatResponseModel)
async def chat(
request: ChatRequest,
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)]
):
"""
Send chat request to specified AI provider
Supports all registered providers:
- **ollama**: Local models (free)
- **groq**: Ultra-fast inference
- **openai**: GPT models
- **anthropic**: Claude models
- **together**: Open models with free tier
Returns response with content, tokens, and cost tracking.
"""
if request.stream:
raise HTTPException(
status_code=400,
detail="Use /chat/stream endpoint for streaming responses"
)
service = UnifiedAIService(db, current_user)
try:
response = await service.chat(
provider=request.provider,
model=request.model,
messages=request.messages,
temperature=request.temperature,
max_tokens=request.max_tokens
)
return ChatResponseModel(
content=response.content,
model=response.model,
provider=response.provider,
tokens_used=response.tokens_used,
cost=response.cost
)
except Exception as e:
logger.error(f"Chat error: {e}", extra={"user_id": current_user.id})
raise HTTPException(
status_code=500,
detail=f"Chat request failed: {str(e)}"
)
@router.post("/chat/stream")
async def chat_stream(
request: ChatRequest,
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)]
):
"""
Stream chat response from specified AI provider
Uses Server-Sent Events (SSE) to stream response chunks in real-time.
"""
service = UnifiedAIService(db, current_user)
async def generate():
try:
async for chunk in service.stream_chat(
provider=request.provider,
model=request.model,
messages=request.messages,
temperature=request.temperature,
max_tokens=request.max_tokens
):
yield f"data: {chunk}\n\n"
yield "data: [DONE]\n\n"
except Exception as e:
logger.error(f"Stream error: {e}", extra={"user_id": current_user.id})
yield f"data: [ERROR] {str(e)}\n\n"
return StreamingResponse(
generate(),
media_type="text/event-stream"
)
@router.get("/providers", response_model=List[ProviderInfo])
async def list_providers(
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)]
):
"""
List all available AI providers
Shows which providers are configured and ready to use.
"""
service = UnifiedAIService(db, current_user)
provider_names = ProviderManager.list_providers()
providers = []
for name in provider_names:
# Check if user has API key
has_key = False
requires_key = name != "ollama"
try:
await service.get_provider(name)
has_key = True
except:
has_key = False
# Get model count
model_count = 0
if has_key or not requires_key:
try:
models = await service.list_models(name)
model_count = len(models)
except:
model_count = 0
providers.append(ProviderInfo(
name=name,
display_name=name.capitalize(),
requires_api_key=requires_key,
has_api_key=has_key,
models_count=model_count
))
return providers
@router.get("/models/{provider}", response_model=List[ModelInfo])
async def list_models(
provider: str,
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)]
):
"""
List available models for specified provider
Returns model information including:
- Model ID and name
- Context length
- Cost per 1K tokens
- Streaming support
"""
service = UnifiedAIService(db, current_user)
try:
models = await service.list_models(provider)
return models
except Exception as e:
raise HTTPException(
status_code=400,
detail=f"Failed to list models: {str(e)}"
)
@router.get("/models/{provider}/{model_id}", response_model=ModelInfo)
async def get_model_info(
provider: str,
model_id: str,
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)]
):
"""
Get detailed information about specific model
"""
service = UnifiedAIService(db, current_user)
try:
model_info = await service.get_model_info(provider, model_id)
return model_info
except Exception as e:
raise HTTPException(
status_code=404,
detail=f"Model not found: {str(e)}"
)
@router.get("/usage", response_model=UsageStats)
async def get_usage_stats(
current_user: Annotated[User, Depends(get_current_active_user)]
):
"""
Get user's AI usage statistics
Returns:
- Total tokens used
- Total cost (all time)
- Monthly cost
- Monthly limit
- Remaining budget
"""
from app.core.config import settings
remaining = settings.MAX_MONTHLY_COST - current_user.monthly_cost
return UsageStats(
total_tokens=current_user.total_tokens_used,
total_cost=current_user.total_cost,
monthly_cost=current_user.monthly_cost,
monthly_limit=settings.MAX_MONTHLY_COST,
remaining_budget=max(0, remaining)
)
Step 14: Create API Key Management Endpoints
Create app/api/v1/endpoints/api_keys.py:
"""
API Key Management Endpoints
Manage user's AI provider API keys
"""
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from typing import Annotated, Optional
from pydantic import BaseModel, Field
from app.db.session import get_db
from app.core.auth_dependencies import get_current_active_user
from app.db.models.user import User
from app.db.repositories.user_repository import UserRepository
from app.utils.encryption import api_key_encryption
from app.models.common import MessageResponse
from app.utils.logger import logger
router = APIRouter(prefix="/api-keys", tags=["API Keys"])
# Request Models
class APIKeyRequest(BaseModel):
"""API key configuration request"""
provider: str = Field(..., description="Provider name (groq, openai, anthropic, together)")
api_key: Optional[str] = Field(None, description="API key (null to delete)")
class APIKeyStatus(BaseModel):
"""API key status"""
provider: str
has_key: bool
masked_key: Optional[str] = None
# Endpoints
@router.post("/set", response_model=MessageResponse)
async def set_api_key(
request: APIKeyRequest,
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)]
):
"""
Set or update API key for specified provider
**Supported providers:**
- groq
- openai
- anthropic
- together
**Security:**
- API keys are encrypted before storage
- Only the user can access their own keys
- Keys are never exposed in API responses
"""
# Validate provider
valid_providers = ["groq", "openai", "anthropic", "together"]
if request.provider not in valid_providers:
raise HTTPException(
status_code=400,
detail=f"Invalid provider. Must be one of: {valid_providers}"
)
# Encrypt API key
encrypted_key = None
if request.api_key:
encrypted_key = api_key_encryption.encrypt(request.api_key)
# Update user
user_repo = UserRepository(db)
if request.provider == "groq":
current_user.groq_api_key = encrypted_key
elif request.provider == "openai":
current_user.openai_api_key = encrypted_key
elif request.provider == "anthropic":
current_user.anthropic_api_key = encrypted_key
elif request.provider == "together":
current_user.together_api_key = encrypted_key
await db.commit()
action = "deleted" if not request.api_key else "updated"
logger.info(
f"API key {action}",
extra={
"user_id": current_user.id,
"provider": request.provider
}
)
return MessageResponse(
message=f"API key {action} successfully for {request.provider}",
success=True
)
@router.get("/status", response_model=list[APIKeyStatus])
async def get_api_key_status(
current_user: Annotated[User, Depends(get_current_active_user)]
):
"""
Get status of all API keys
Returns which providers have configured API keys.
Keys are masked for security.
"""
providers = {
"groq": current_user.groq_api_key,
"openai": current_user.openai_api_key,
"anthropic": current_user.anthropic_api_key,
"together": current_user.together_api_key,
}
status_list = []
for provider, encrypted_key in providers.items():
has_key = bool(encrypted_key)
masked = None
if has_key:
# Decrypt and mask
decrypted = api_key_encryption.decrypt(encrypted_key)
if len(decrypted) > 8:
masked = f"{decrypted[:4]}...{decrypted[-4:]}"
else:
masked = "***"
status_list.append(APIKeyStatus(
provider=provider,
has_key=has_key,
masked_key=masked
))
return status_list
@router.delete("/{provider}", response_model=MessageResponse)
async def delete_api_key(
provider: str,
current_user: Annotated[User, Depends(get_current_active_user)],
db: Annotated[AsyncSession, Depends(get_db)]
):
"""
Delete API key for specified provider
"""
valid_providers = ["groq", "openai", "anthropic", "together"]
if provider not in valid_providers:
raise HTTPException(
status_code=400,
detail=f"Invalid provider. Must be one of: {valid_providers}"
)
# Delete key
if provider == "groq":
current_user.groq_api_key = None
elif provider == "openai":
current_user.openai_api_key = None
elif provider == "anthropic":
current_user.anthropic_api_key = None
elif provider == "together":
current_user.together_api_key = None
await db.commit()
logger.info(
f"API key deleted",
extra={
"user_id": current_user.id,
"provider": provider
}
)
return MessageResponse(
message=f"API key deleted successfully for {provider}",
success=True
)
Step 15: Update API Router
Update app/api/v1/api.py:
# Add new imports
from app.api.v1.endpoints import (
users,
health,
users_advanced,
dependencies_demo,
conversations,
auth,
ai_multi, # New
api_keys # New
)
# Add new routers
api_router.include_router(ai_multi.router)
api_router.include_router(api_keys.router)
Phase 10: React Frontend Updates ⚛️
Step 16: Update Frontend Types
Update frontend/src/types/chat.ts:
// Add new types
export interface Provider {
name: string;
display_name: string;
requires_api_key: boolean;
has_api_key: boolean;
models_count: number;
}
export interface ModelInfo {
id: string;
name: string;
provider: string;
context_length: number;
cost_per_1k_tokens: number;
supports_streaming: boolean;
}
export interface UsageStats {
total_tokens: number;
total_cost: number;
monthly_cost: number;
monthly_limit: number;
remaining_budget: number;
}
export interface APIKeyStatus {
provider: string;
has_key: boolean;
masked_key: string | null;
}
export interface ChatRequest {
provider: string;
model: string;
messages: Message[];
temperature?: number;
max_tokens?: number;
stream?: boolean;
}
Step 17: Create Provider Selector Component
Create frontend/src/components/ProviderSelector.tsx:
import React, { useEffect, useState } from 'react';
import { Provider, ModelInfo } from '../types/chat';
import { api } from '../services/api';
interface ProviderSelectorProps {
selectedProvider: string;
selectedModel: string;
onProviderChange: (provider: string) => void;
onModelChange: (model: string) => void;
}
export const ProviderSelector: React.FC<ProviderSelectorProps> = ({
selectedProvider,
selectedModel,
onProviderChange,
onModelChange
}) => {
const [providers, setProviders] = useState<Provider[]>([]);
const [models, setModels] = useState<ModelInfo[]>([]);
const [loading, setLoading] = useState(true);
useEffect(() => {
loadProviders();
}, []);
useEffect(() => {
if (selectedProvider) {
loadModels(selectedProvider);
}
}, [selectedProvider]);
const loadProviders = async () => {
try {
const data = await api.listProviders();
setProviders(data);
// Select first available provider
const available = data.find(p => p.has_api_key || !p.requires_api_key);
if (available && !selectedProvider) {
onProviderChange(available.name);
}
} catch (error) {
console.error('Failed to load providers:', error);
} finally {
setLoading(false);
}
};
const loadModels = async (provider: string) => {
try {
const data = await api.listModels(provider);
setModels(data);
// Select first model
if (data.length > 0 && !selectedModel) {
onModelChange(data[0].id);
}
} catch (error) {
console.error('Failed to load models:', error);
}
};
if (loading) {
return <div className="text-sm text-gray-500">Loading providers...</div>;
}
return (
<div className="flex gap-4">
{/* Provider Selector */}
<div className="flex-1">
<label className="block text-sm font-medium text-gray-700 mb-1">
Provider
</label>
<select
value={selectedProvider}
onChange={(e) => onProviderChange(e.target.value)}
className="w-full px-3 py-2 border border-gray-300 rounded-lg focus:ring-2 focus:ring-blue-500 focus:border-transparent"
>
<option value="">Select provider...</option>
{providers.map((provider) => (
<option
key={provider.name}
value={provider.name}
disabled={provider.requires_api_key && !provider.has_api_key}
>
{provider.display_name}
{provider.requires_api_key && !provider.has_api_key && ' (API key required)'}
{!provider.requires_api_key && ' (Free)'}
{provider.has_api_key && ` (${provider.models_count} models)`}
</option>
))}
</select>
</div>
{/* Model Selector */}
<div className="flex-1">
<label className="block text-sm font-medium text-gray-700 mb-1">
Model
</label>
<select
value={selectedModel}
onChange={(e) => onModelChange(e.target.value)}
disabled={!selectedProvider || models.length === 0}
className="w-full px-3 py-2 border border-gray-300 rounded-lg focus:ring-2 focus:ring-blue-500 focus:border-transparent disabled:bg-gray-100"
>
<option value="">Select model...</option>
{models.map((model) => (
<option key={model.id} value={model.id}>
{model.name}
{model.cost_per_1k_tokens > 0 && ` ($${model.cost_per_1k_tokens}/1K tokens)`}
{model.cost_per_1k_tokens === 0 && ' (Free)'}
</option>
))}
</select>
</div>
</div>
);
};
Phase 11: API Key Management UI 🔑
Step 18: Create API Key Management Component
Create frontend/src/components/APIKeyManager.tsx:
import React, { useEffect, useState } from 'react';
import { APIKeyStatus } from '../types/chat';
import { api } from '../services/api';
import { Key, Eye, EyeOff, Check, X, AlertCircle } from 'lucide-react';
export const APIKeyManager: React.FC = () => {
const [apiKeys, setApiKeys] = useState<APIKeyStatus[]>([]);
const [loading, setLoading] = useState(true);
const [editingProvider, setEditingProvider] = useState<string | null>(null);
const [newKey, setNewKey] = useState('');
const [showKey, setShowKey] = useState(false);
const [saving, setSaving] = useState(false);
const [message, setMessage] = useState<{ type: 'success' | 'error', text: string } | null>(null);
useEffect(() => {
loadAPIKeys();
}, []);
const loadAPIKeys = async () => {
try {
const data = await api.getAPIKeyStatus();
setApiKeys(data);
} catch (error) {
console.error('Failed to load API keys:', error);
} finally {
setLoading(false);
}
};
const handleSave = async (provider: string) => {
setSaving(true);
setMessage(null);
try {
await api.setAPIKey(provider, newKey || null);
setMessage({
type: 'success',
text: `API key ${newKey ? 'saved' : 'deleted'} successfully for ${provider}`
});
// Reload keys
await loadAPIKeys();
// Reset form
setEditingProvider(null);
setNewKey('');
setShowKey(false);
} catch (error: any) {
setMessage({
type: 'error',
text: error.response?.data?.detail || 'Failed to save API key'
});
} finally {
setSaving(false);
}
};
const handleDelete = async (provider: string) => {
if (!confirm(`Delete API key for ${provider}?`)) {
return;
}
setSaving(true);
setMessage(null);
try {
await api.deleteAPIKey(provider);
setMessage({
type: 'success',
text: `API key deleted successfully for ${provider}`
});
await loadAPIKeys();
} catch (error: any) {
setMessage({
type: 'error',
text: error.response?.data?.detail || 'Failed to delete API key'
});
} finally {
setSaving(false);
}
};
const providerInfo = {
groq: {
name: 'Groq',
description: 'Ultra-fast inference with LLaMA and Mixtral models',
docs: 'https://console.groq.com/keys',
pricing: 'Very affordable, starting at $0.00007/1K tokens'
},
openai: {
name: 'OpenAI',
description: 'GPT-4, GPT-4 Turbo, and GPT-3.5 Turbo models',
docs: 'https://platform.openai.com/api-keys',
pricing: 'From $0.0005/1K tokens (GPT-3.5) to $0.03/1K (GPT-4)'
},
anthropic: {
name: 'Anthropic',
description: 'Claude 3 Opus, Sonnet, and Haiku models',
docs: 'https://console.anthropic.com/settings/keys',
pricing: 'From $0.00025/1K tokens (Haiku) to $0.015/1K (Opus)'
},
together: {
name: 'Together AI',
description: 'Open models with $25 free credits',
docs: 'https://api.together.xyz/settings/api-keys',
pricing: 'Very competitive, $0.0002/1K tokens with free tier'
}
};
if (loading) {
return (
<div className="flex items-center justify-center p-8">
<div className="text-gray-500">Loading API keys...</div>
</div>
);
}
return (
<div className="max-w-4xl mx-auto p-6">
<div className="mb-6">
<h2 className="text-2xl font-bold text-gray-900 mb-2">API Key Management</h2>
<p className="text-gray-600">
Configure your API keys to use different AI providers. Keys are encrypted and stored securely.
</p>
</div>
{/* Message */}
{message && (
<div className={`mb-6 p-4 rounded-lg flex items-start gap-3 ${
message.type === 'success'
? 'bg-green-50 text-green-800 border border-green-200'
: 'bg-red-50 text-red-800 border border-red-200'
}`}>
{message.type === 'success' ? (
<Check className="w-5 h-5 flex-shrink-0 mt-0.5" />
) : (
<AlertCircle className="w-5 h-5 flex-shrink-0 mt-0.5" />
)}
<span>{message.text}</span>
</div>
)}
{/* API Keys Grid */}
<div className="grid gap-6">
{apiKeys.map((keyStatus) => {
const info = providerInfo[keyStatus.provider as keyof typeof providerInfo];
const isEditing = editingProvider === keyStatus.provider;
return (
<div key={keyStatus.provider} className="border border-gray-200 rounded-lg p-6 bg-white shadow-sm">
{/* Header */}
<div className="flex items-start justify-between mb-4">
<div className="flex items-start gap-3">
<div className="p-2 bg-blue-50 rounded-lg">
<Key className="w-5 h-5 text-blue-600" />
</div>
<div>
<h3 className="font-semibold text-gray-900">{info.name}</h3>
<p className="text-sm text-gray-600 mt-1">{info.description}</p>
</div>
</div>
{/* Status Badge */}
<div className={`px-3 py-1 rounded-full text-xs font-medium ${
keyStatus.has_key
? 'bg-green-100 text-green-800'
: 'bg-gray-100 text-gray-600'
}`}>
{keyStatus.has_key ? 'Configured' : 'Not configured'}
</div>
</div>
{/* Pricing Info */}
<div className="mb-4 p-3 bg-gray-50 rounded-lg">
<p className="text-xs text-gray-600">
<strong>Pricing:</strong> {info.pricing}
</p>
<a
href={info.docs}
target="_blank"
rel="noopener noreferrer"
className="text-xs text-blue-600 hover:text-blue-700 mt-1 inline-block"
>
Get API key →
</a>
</div>
{/* Current Key (if exists) */}
{keyStatus.has_key && !isEditing && (
<div className="mb-4">
<label className="block text-sm font-medium text-gray-700 mb-2">
Current API Key
</label>
<div className="font-mono text-sm bg-gray-100 px-3 py-2 rounded border border-gray-200">
{keyStatus.masked_key}
</div>
</div>
)}
{/* Edit Form */}
{isEditing && (
<div className="mb-4">
<label className="block text-sm font-medium text-gray-700 mb-2">
{keyStatus.has_key ? 'Update' : 'Add'} API Key
</label>
<div className="relative">
<input
type={showKey ? 'text' : 'password'}
value={newKey}
onChange={(e) => setNewKey(e.target.value)}
placeholder={`Enter your ${info.name} API key...`}
className="w-full px-3 py-2 pr-10 border border-gray-300 rounded-lg focus:ring-2 focus:ring-blue-500 focus:border-transparent font-mono text-sm"
/>
<button
type="button"
onClick={() => setShowKey(!showKey)}
className="absolute right-2 top-1/2 -translate-y-1/2 text-gray-400 hover:text-gray-600"
>
{showKey ? <EyeOff className="w-4 h-4" /> : <Eye className="w-4 h-4" />}
</button>
</div>
</div>
)}
{/* Actions */}
<div className="flex gap-2">
{!isEditing ? (
<>
<button
onClick={() => {
setEditingProvider(keyStatus.provider);
setNewKey('');
setMessage(null);
}}
className="px-4 py-2 bg-blue-600 text-white rounded-lg hover:bg-blue-700 transition-colors text-sm font-medium"
>
{keyStatus.has_key ? 'Update Key' : 'Add Key'}
</button>
{keyStatus.has_key && (
<button
onClick={() => handleDelete(keyStatus.provider)}
disabled={saving}
className="px-4 py-2 bg-red-600 text-white rounded-lg hover:bg-red-700 transition-colors text-sm font-medium disabled:opacity-50"
>
Delete Key
</button>
)}
</>
) : (
<>
<button
onClick={() => handleSave(keyStatus.provider)}
disabled={saving || !newKey}
className="px-4 py-2 bg-green-600 text-white rounded-lg hover:bg-green-700 transition-colors text-sm font-medium disabled:opacity-50 flex items-center gap-2"
>
{saving ? (
<>
<div className="w-4 h-4 border-2 border-white border-t-transparent rounded-full animate-spin" />
Saving...
</>
) : (
<>
<Check className="w-4 h-4" />
Save
</>
)}
</button>
<button
onClick={() => {
setEditingProvider(null);
setNewKey('');
setShowKey(false);
setMessage(null);
}}
disabled={saving}
className="px-4 py-2 bg-gray-200 text-gray-700 rounded-lg hover:bg-gray-300 transition-colors text-sm font-medium disabled:opacity-50 flex items-center gap-2"
>
<X className="w-4 h-4" />
Cancel
</button>
</>
)}
</div>
</div>
);
})}
</div>
{/* Info Section */}
<div className="mt-8 p-4 bg-blue-50 border border-blue-200 rounded-lg">
<div className="flex items-start gap-3">
<AlertCircle className="w-5 h-5 text-blue-600 flex-shrink-0 mt-0.5" />
<div className="text-sm text-blue-800">
<p className="font-medium mb-1">Security Note</p>
<p>
Your API keys are encrypted before storage and never exposed in responses.
Only you can view and manage your keys. You can also use system-level API keys
configured by the administrator instead of providing your own.
</p>
</div>
</div>
</div>
</div>
);
};
Phase 12: Usage Dashboard 📊
Step 19: Create Usage Dashboard Component
Create frontend/src/components/UsageDashboard.tsx:
import React, { useEffect, useState } from 'react';
import { UsageStats } from '../types/chat';
import { api } from '../services/api';
import { TrendingUp, DollarSign, Zap, Calendar } from 'lucide-react';
export const UsageDashboard: React.FC = () => {
const [stats, setStats] = useState<UsageStats | null>(null);
const [loading, setLoading] = useState(true);
useEffect(() => {
loadStats();
// Refresh every 30 seconds
const interval = setInterval(loadStats, 30000);
return () => clearInterval(interval);
}, []);
const loadStats = async () => {
try {
const data = await api.getUsageStats();
setStats(data);
} catch (error) {
console.error('Failed to load usage stats:', error);
} finally {
setLoading(false);
}
};
if (loading) {
return (
<div className="flex items-center justify-center p-8">
<div className="text-gray-500">Loading usage statistics...</div>
</div>
);
}
if (!stats) {
return null;
}
const percentUsed = (stats.monthly_cost / stats.monthly_limit) * 100;
const isNearLimit = percentUsed >= 80;
return (
<div className="max-w-6xl mx-auto p-6">
<div className="mb-6">
<h2 className="text-2xl font-bold text-gray-900 mb-2">Usage Dashboard</h2>
<p className="text-gray-600">
Track your AI usage and costs across all providers.
</p>
</div>
{/* Stats Grid */}
<div className="grid grid-cols-1 md:grid-cols-2 lg:grid-cols-4 gap-6 mb-8">
{/* Total Tokens */}
<div className="bg-white border border-gray-200 rounded-lg p-6 shadow-sm">
<div className="flex items-start justify-between mb-4">
<div className="p-2 bg-purple-50 rounded-lg">
<Zap className="w-5 h-5 text-purple-600" />
</div>
</div>
<div className="text-2xl font-bold text-gray-900 mb-1">
{stats.total_tokens.toLocaleString()}
</div>
<div className="text-sm text-gray-600">Total Tokens Used</div>
</div>
{/* Total Cost */}
<div className="bg-white border border-gray-200 rounded-lg p-6 shadow-sm">
<div className="flex items-start justify-between mb-4">
<div className="p-2 bg-green-50 rounded-lg">
<DollarSign className="w-5 h-5 text-green-600" />
</div>
</div>
<div className="text-2xl font-bold text-gray-900 mb-1">
${stats.total_cost.toFixed(4)}
</div>
<div className="text-sm text-gray-600">Total Cost (All Time)</div>
</div>
{/* Monthly Cost */}
<div className="bg-white border border-gray-200 rounded-lg p-6 shadow-sm">
<div className="flex items-start justify-between mb-4">
<div className="p-2 bg-blue-50 rounded-lg">
<Calendar className="w-5 h-5 text-blue-600" />
</div>
</div>
<div className="text-2xl font-bold text-gray-900 mb-1">
${stats.monthly_cost.toFixed(4)}
</div>
<div className="text-sm text-gray-600">This Month</div>
</div>
{/* Remaining Budget */}
<div className={`bg-white border rounded-lg p-6 shadow-sm ${
isNearLimit ? 'border-red-300 bg-red-50' : 'border-gray-200'
}`}>
<div className="flex items-start justify-between mb-4">
<div className={`p-2 rounded-lg ${
isNearLimit ? 'bg-red-100' : 'bg-orange-50'
}`}>
<TrendingUp className={`w-5 h-5 ${
isNearLimit ? 'text-red-600' : 'text-orange-600'
}`} />
</div>
</div>
<div className={`text-2xl font-bold mb-1 ${
isNearLimit ? 'text-red-900' : 'text-gray-900'
}`}>
${stats.remaining_budget.toFixed(4)}
</div>
<div className={`text-sm ${
isNearLimit ? 'text-red-700' : 'text-gray-600'
}`}>
Remaining Budget
</div>
</div>
</div>
{/* Budget Progress */}
<div className="bg-white border border-gray-200 rounded-lg p-6 shadow-sm mb-8">
<div className="flex items-center justify-between mb-3">
<div>
<h3 className="font-semibold text-gray-900">Monthly Budget</h3>
<p className="text-sm text-gray-600 mt-1">
${stats.monthly_cost.toFixed(4)} of ${stats.monthly_limit.toFixed(2)} used
</p>
</div>
<div className="text-right">
<div className={`text-2xl font-bold ${
isNearLimit ? 'text-red-600' : 'text-gray-900'
}`}>
{percentUsed.toFixed(1)}%
</div>
<div className="text-sm text-gray-600">Used</div>
</div>
</div>
{/* Progress Bar */}
<div className="w-full bg-gray-200 rounded-full h-4 overflow-hidden">
<div
className={`h-full transition-all duration-500 ${
percentUsed >= 90 ? 'bg-red-600' :
percentUsed >= 80 ? 'bg-orange-600' :
percentUsed >= 50 ? 'bg-yellow-600' :
'bg-green-600'
}`}
style={{ width: `${Math.min(percentUsed, 100)}%` }}
/>
</div>
{/* Warning */}
{isNearLimit && (
<div className="mt-4 p-3 bg-red-50 border border-red-200 rounded-lg">
<p className="text-sm text-red-800">
⚠️ <strong>Warning:</strong> You've used {percentUsed.toFixed(1)}% of your monthly budget.
Consider upgrading your plan or reducing usage.
</p>
</div>
)}
</div>
{/* Tips */}
<div className="bg-blue-50 border border-blue-200 rounded-lg p-6">
<h3 className="font-semibold text-blue-900 mb-3">💡 Cost Saving Tips</h3>
<ul className="space-y-2 text-sm text-blue-800">
<li>• Use Ollama (local models) for unlimited free usage</li>
<li>• Together AI offers $25 in free credits</li>
<li>• Groq provides very fast inference at low cost ($0.00007/1K tokens)</li>
<li>• Use smaller models (GPT-3.5, Claude Haiku) for simple tasks</li>
<li>• Limit max_tokens to reduce costs per request</li>
</ul>
</div>
</div>
);
};
Phase 13: Update API Service 🔌
Step 20: Update API Service with New Endpoints
Update frontend/src/services/api.ts:
// Add new methods to the api object
// Multi-Provider AI
async multiProviderChat(request: ChatRequest): Promise<any> {
const response = await axios.post('/ai/multi/chat', request);
return response.data;
},
async *streamMultiProviderChat(request: ChatRequest): AsyncGenerator<string> {
const response = await fetch(`${API_BASE_URL}/ai/multi/chat/stream`, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'Authorization': `Bearer ${this.getToken()}`
},
body: JSON.stringify(request)
});
if (!response.ok) {
throw new Error(`HTTP error! status: ${response.status}`);
}
const reader = response.body?.getReader();
const decoder = new TextDecoder();
if (!reader) {
throw new Error('No response body');
}
while (true) {
const { done, value } = await reader.read();
if (done) break;
const chunk = decoder.decode(value);
const lines = chunk.split('\n');
for (const line of lines) {
if (line.startsWith('data: ')) {
const data = line.slice(6);
if (data === '[DONE]') {
return;
}
if (data.startsWith('[ERROR]')) {
throw new Error(data.slice(8));
}
yield data;
}
}
}
},
async listProviders(): Promise<Provider[]> {
const response = await axios.get('/ai/multi/providers');
return response.data;
},
async listModels(provider: string): Promise<ModelInfo[]> {
const response = await axios.get(`/ai/multi/models/${provider}`);
return response.data;
},
async getModelInfo(provider: string, modelId: string): Promise<ModelInfo> {
const response = await axios.get(`/ai/multi/models/${provider}/${modelId}`);
return response.data;
},
async getUsageStats(): Promise<UsageStats> {
const response = await axios.get('/ai/multi/usage');
return response.data;
},
// API Key Management
async getAPIKeyStatus(): Promise<APIKeyStatus[]> {
const response = await axios.get('/api-keys/status');
return response.data;
},
async setAPIKey(provider: string, apiKey: string | null): Promise<void> {
await axios.post('/api-keys/set', {
provider,
api_key: apiKey
});
},
async deleteAPIKey(provider: string): Promise<void> {
await axios.delete(`/api-keys/${provider}`);
},
getToken(): string | null {
return localStorage.getItem('access_token');
}
Phase 14: Testing Scripts 🧪
Step 21: Create Multi-Provider Test Script
Create test_multi_provider.py:
"""
Test script for multi-provider AI integration
Tests all providers and features
"""
import requests
import json
import time
from typing import Dict
BASE_URL = "http://127.0.0.1:8000/api/v1"
def print_test(title: str, response: requests.Response):
"""Print test results"""
print(f"\n{'='*70}")
print(f"{title}")
print(f"{'='*70}")
print(f"Status: {response.status_code}")
try:
data = response.json()
print(f"Response:\n{json.dumps(data, indent=2, default=str)}")
except:
print(f"Response: {response.text[:500]}")
def login() -> str:
"""Login and get access token"""
response = requests.post(
f"{BASE_URL}/auth/login",
json={
"username": "auth_test_user",
"password": "ResetPass123" # From previous tests
}
)
if response.status_code == 200:
return response.json()['access_token']
# Try creating user if login fails
requests.post(
f"{BASE_URL}/auth/register",
json={
"username": "multi_test_user",
"email": "multitest@example.com",
"password": "TestPass123"
}
)
response = requests.post(
f"{BASE_URL}/auth/login",
json={
"username": "multi_test_user",
"password": "TestPass123"
}
)
return response.json()['access_token']
def test_list_providers(token: str):
"""Test listing providers"""
headers = {"Authorization": f"Bearer {token}"}
response = requests.get(f"{BASE_URL}/ai/multi/providers", headers=headers)
print_test("📋 LIST PROVIDERS", response)
return response.json() if response.status_code == 200 else []
def test_list_models(token: str, provider: str):
"""Test listing models for provider"""
headers = {"Authorization": f"Bearer {token}"}
response = requests.get(f"{BASE_URL}/ai/multi/models/{provider}", headers=headers)
print_test(f"🤖 LIST MODELS ({provider})", response)
return response.json() if response.status_code == 200 else []
def test_get_model_info(token: str, provider: str, model_id: str):
"""Test getting model info"""
headers = {"Authorization": f"Bearer {token}"}
response = requests.get(
f"{BASE_URL}/ai/multi/models/{provider}/{model_id}",
headers=headers
)
print_test(f"ℹ️ MODEL INFO ({provider}/{model_id})", response)
def test_chat(token: str, provider: str, model: str):
"""Test chat with provider"""
headers = {"Authorization": f"Bearer {token}"}
response = requests.post(
f"{BASE_URL}/ai/multi/chat",
headers=headers,
json={
"provider": provider,
"model": model,
"messages": [
{"role": "user", "content": "Say hello in one sentence"}
],
"temperature": 0.7,
"max_tokens": 100
}
)
print_test(f"💬 CHAT ({provider}/{model})", response)
return response.json() if response.status_code == 200 else None
def test_stream_chat(token: str, provider: str, model: str):
"""Test streaming chat"""
headers = {"Authorization": f"Bearer {token}"}
print(f"\n{'='*70}")
print(f"🌊 STREAM CHAT ({provider}/{model})")
print(f"{'='*70}")
response = requests.post(
f"{BASE_URL}/ai/multi/chat/stream",
headers=headers,
json={
"provider": provider,
"model": model,
"messages": [
{"role": "user", "content": "Count from 1 to 5"}
],
"temperature": 0.7,
"max_tokens": 100,
"stream": True
},
stream=True
)
print(f"Status: {response.status_code}")
print("Stream output:")
for line in response.iter_lines():
if line:
line_str = line.decode('utf-8')
if line_str.startswith('data: '):
data = line_str[6:]
if data == '[DONE]':
print("\n[Stream complete]")
break
elif data.startswith('[ERROR]'):
print(f"\n[Error: {data[8:]}]")
break
else:
print(data, end='', flush=True)
def test_usage_stats(token: str):
"""Test getting usage statistics"""
headers = {"Authorization": f"Bearer {token}"}
response = requests.get(f"{BASE_URL}/ai/multi/usage", headers=headers)
print_test("📊 USAGE STATISTICS", response)
def test_api_key_status(token: str):
"""Test getting API key status"""
headers = {"Authorization": f"Bearer {token}"}
response = requests.get(f"{BASE_URL}/api-keys/status", headers=headers)
print_test("🔑 API KEY STATUS", response)
def test_set_api_key(token: str):
"""Test setting API key"""
headers = {"Authorization": f"Bearer {token}"}
# Note: Use a real API key for actual testing
response = requests.post(
f"{BASE_URL}/api-keys/set",
headers=headers,
json={
"provider": "groq",
"api_key": "test_key_123" # Fake key for demo
}
)
print_test("💾 SET API KEY (Groq)", response)
def run_all_tests():
"""Run complete test suite"""
print("\n" + "🧪"*35)
print("MULTI-PROVIDER AI TEST SUITE")
print("🧪"*35)
# Login
print("\n🔐 Logging in...")
token = login()
print(f"✅ Token obtained: {token[:20]}...")
# List providers
print("\n" + "="*70)
print("PROVIDER DISCOVERY")
print("="*70)
providers = test_list_providers(token)
# Test each available provider
for provider_info in providers:
provider_name = provider_info['name']
# Skip if requires API key and doesn't have one
if provider_info['requires_api_key'] and not provider_info['has_api_key']:
print(f"\n⚠️ Skipping {provider_name} - API key required")
continue
print(f"\n" + "="*70)
print(f"TESTING {provider_name.upper()}")
print("="*70)
# List models
models = test_list_models(token, provider_name)
if models:
# Get first model info
first_model = models[0]
test_get_model_info(token, provider_name, first_model['id'])
# Test chat
test_chat(token, provider_name, first_model['id'])
# Test streaming
time.sleep(1) # Rate limiting
test_stream_chat(token, provider_name, first_model['id'])
# Usage stats
print("\n" + "="*70)
print("USAGE & BILLING")
print("="*70)
test_usage_stats(token)
# API key management
print("\n" + "="*70)
print("API KEY MANAGEMENT")
print("="*70)
test_api_key_status(token)
test_set_api_key(token)
test_api_key_status(token)
print("\n" + "✅"*35)
print("ALL TESTS COMPLETED!")
print("✅"*35)
print("\n💡 Features Tested:")
print(" ✅ Provider listing")
print(" ✅ Model discovery")
print(" ✅ Chat (non-streaming)")
print(" ✅ Chat (streaming)")
print(" ✅ Usage tracking")
print(" ✅ API key management\n")
if __name__ == "__main__":
print("""
╔════════════════════════════════════════════════════════╗
║ Multi-Provider AI Test Suite ║
║ ║
║ Tests all AI providers: ║
║ - Ollama (local, free) ║
║ - Groq (ultra-fast) ║
║ - OpenAI (GPT models) ║
║ - Anthropic (Claude models) ║
║ - Together AI (open models) ║
║ ║
║ Prerequisites: ║
║ 1. Server running (python main.py) ║
║ 2. Database migrations applied ║
║ 3. API keys configured (optional) ║
╚════════════════════════════════════════════════════════╝
""")
try:
response = requests.get(f"{BASE_URL}/health")
if response.status_code == 200:
run_all_tests()
else:
print("❌ Server health check failed")
except requests.exceptions.ConnectionError:
print("❌ ERROR: Cannot connect to server!")
print(" Please start the server with: python main.py")
except Exception as e:
print(f"❌ ERROR: {e}")
Phase 15: Documentation Updates 📚
Step 22: Create Multi-Provider Integration Guide
Create MULTI_PROVIDER_GUIDE.md:
# Multi-Provider AI Integration Guide
Complete guide to using multiple AI providers in AIVerse.
## 📋 Table of Contents
1. [Overview](#overview)
2. [Supported Providers](#supported-providers)
3. [Getting API Keys](#getting-api-keys)
4. [Configuration](#configuration)
5. [Usage Examples](#usage-examples)
6. [Cost Management](#cost-management)
7. [Best Practices](#best-practices)
8. [Troubleshooting](#troubleshooting)
---
## Overview
AIVerse now supports multiple AI providers through a unified interface:
- **Ollama** - Local LLM models (free, unlimited)
- **Groq** - Ultra-fast inference (very affordable)
- **OpenAI** - GPT-3.5, GPT-4, GPT-4 Turbo
- **Anthropic** - Claude 3 Opus, Sonnet, Haiku
- **Together AI** - Open models with $25 free credits
### Architecture
User Request ↓ Unified AI Service (provider selection) ↓ Provider Manager (factory pattern) ↓ Specific Provider (Groq/OpenAI/etc.) ↓ API Response + Cost Tracking
---
## Supported Providers
### 1. Ollama (Local, Free)
**Pros:**
- ✅ Completely free
- ✅ Unlimited usage
- ✅ Privacy (runs locally)
- ✅ No API key needed
- ✅ Fast on GPU
**Cons:**
- ❌ Requires local installation
- ❌ GPU recommended for speed
- ❌ Limited to open models
**Models:**
- LLaMA 2 (7B, 13B, 70B)
- Mistral (7B)
- CodeLLaMA (7B, 13B, 34B)
- Mixtral (8x7B)
**Setup:**
```bash
# Install Ollama
curl https://ollama.ai/install.sh | sh
# Pull models
ollama pull llama2
ollama pull mistral
ollama pull codellama
# Start server
ollama serve
```
**No API key required!**
---
### 2. Groq (Ultra-Fast Inference)
**Pros:**
- ✅ Extremely fast (50-100 tokens/sec)
- ✅ Very affordable ($0.00007-$0.00079/1K tokens)
- ✅ Good model selection
- ✅ Easy to use
**Cons:**
- ❌ Requires API key
- ❌ Rate limits on free tier
**Models:**
- LLaMA 3.3 70B ($0.00059/1K input, $0.00079/1K output)
- LLaMA 3.1 70B ($0.00059/1K input, $0.00079/1K output)
- LLaMA 3.1 8B ($0.00005/1K input, $0.00008/1K output)
- Mixtral 8x7B ($0.00024/1K tokens)
- Gemma 7B ($0.00007/1K tokens)
**Get API Key:**
1. Go to https://console.groq.com
2. Sign up for free account
3. Navigate to API Keys
4. Create new API key
5. Copy and save securely
**Free Tier:**
- Rate limit: 30 requests/minute
- Good for development and testing
---
### 3. OpenAI (GPT Models)
**Pros:**
- ✅ Most capable models (GPT-4)
- ✅ Best at reasoning and complex tasks
- ✅ Excellent documentation
- ✅ Reliable infrastructure
**Cons:**
- ❌ Most expensive option
- ❌ No free tier
- ❌ Usage-based pricing
**Models:**
- GPT-4 Turbo ($0.01/1K input, $0.03/1K output)
- GPT-4 ($0.03/1K input, $0.06/1K output)
- GPT-4o ($0.005/1K input, $0.015/1K output)
- GPT-4o Mini ($0.00015/1K input, $0.0006/1K output)
- GPT-3.5 Turbo ($0.0005/1K input, $0.0015/1K output)
**Get API Key:**
1. Go to https://platform.openai.com
2. Create account ($5 minimum credit required)
3. Navigate to API Keys
4. Create new secret key
5. Copy and save securely
**Cost Example:**
- 1000 GPT-4 Turbo requests (1K tokens each): ~$40
- 1000 GPT-3.5 Turbo requests (1K tokens each): ~$1
---
### 4. Anthropic (Claude Models)
**Pros:**
- ✅ Very capable (rivals GPT-4)
- ✅ 200K context window
- ✅ Good at following instructions
- ✅ Ethical AI focus
**Cons:**
- ❌ Requires API key
- ❌ No free tier
- ❌ Limited model selection
**Models:**
- Claude 3 Opus ($0.015/1K input, $0.075/1K output)
- Claude 3.5 Sonnet ($0.003/1K input, $0.015/1K output)
- Claude 3 Sonnet ($0.003/1K input, $0.015/1K output)
- Claude 3 Haiku ($0.00025/1K input, $0.00125/1K output)
**Get API Key:**
1. Go to https://console.anthropic.com
2. Create account
3. Add credits ($5 minimum)
4. Navigate to API Keys
5. Create new key
**Best For:**
- Long documents (200K context)
- Complex reasoning
- Code generation
- Creative writing
---
### 5. Together AI (Open Models)
**Pros:**
- ✅ $25 free credits
- ✅ Very affordable after free tier
- ✅ Many open models
- ✅ Good for experimentation
**Cons:**
- ❌ Slower than Groq
- ❌ Less reliable than paid options
**Models:**
- LLaMA 3 70B ($0.0009/1K tokens)
- LLaMA 3 8B ($0.0002/1K tokens)
- Mixtral 8x7B ($0.0006/1K tokens)
- Mistral 7B ($0.0002/1K tokens)
- CodeLLaMA 34B ($0.0008/1K tokens)
**Get API Key:**
1. Go to https://api.together.xyz
2. Create account (get $25 free credits)
3. Navigate to Settings → API Keys
4. Create new API key
**Free Credits:**
- $25 on signup
- ~12.5M tokens with cheapest model
- ~27K tokens with most expensive model
---
## Configuration
### System-Level Configuration
For shared API keys used by all users, add to `.env`:
```bash
# Multi-Provider AI Keys
GROQ_API_KEY=gsk_xxxxxxxxxxxxxxxxxxxxx
OPENAI_API_KEY=sk-xxxxxxxxxxxxxxxxxxxxx
ANTHROPIC_API_KEY=sk-ant-xxxxxxxxxxxxxxxxxxxxx
TOGETHER_API_KEY=xxxxxxxxxxxxxxxxxxxxx
# Default provider
DEFAULT_AI_PROVIDER=ollama
# Cost settings
ENABLE_COST_TRACKING=True
MAX_MONTHLY_COST=100.00
```
### User-Level Configuration
Users can configure their own API keys via:
1. **Web UI:**
- Navigate to Settings → API Keys
- Select provider
- Enter API key
- Click Save
2. **API Endpoint:**
```bash
curl -X POST http://localhost:8000/api/v1/api-keys/set \
-H "Authorization: Bearer YOUR_TOKEN" \
-H "Content-Type: application/json" \
-d '{"provider": "groq", "api_key": "gsk_xxxx"}'
```
### Priority Order
API keys are used in this priority:
1. **User's API key** (if configured)
2. **System API key** (from .env)
3. **Error** (if provider requires key and none found)
---
## Usage Examples
### Python SDK
```python
from app.services.ai.unified_service import UnifiedAIService
from app.services.ai.base_provider import ChatMessage
# Initialize service
service = UnifiedAIService(db, current_user)
# Chat with Groq
response = await service.chat(
provider="groq",
model="llama-3.3-70b-versatile",
messages=[
ChatMessage(role="user", content="Explain quantum computing")
],
temperature=0.7,
max_tokens=500
)
print(f"Response: {response.content}")
print(f"Cost: ${response.cost:.6f}")
print(f"Tokens: {response.tokens_used}")
# Stream with OpenAI
async for chunk in service.stream_chat(
provider="openai",
model="gpt-4-turbo",
messages=[
ChatMessage(role="user", content="Write a poem about AI")
],
temperature=0.8,
max_tokens=200
):
print(chunk, end='', flush=True)
```
### REST API
**Non-Streaming:**
```bash
curl -X POST http://localhost:8000/api/v1/ai/multi/chat \
-H "Authorization: Bearer YOUR_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"provider": "groq",
"model": "llama-3.3-70b-versatile",
"messages": [
{"role": "user", "content": "Hello!"}
],
"temperature": 0.7,
"max_tokens": 100
}'
```
**Streaming:**
```bash
curl -X POST http://localhost:8000/api/v1/ai/multi/chat/stream \
-H "Authorization: Bearer YOUR_TOKEN" \
-H "Content-Type: application/json" \
-d '{
"provider": "anthropic",
"model": "claude-3-5-sonnet-20241022",
"messages": [
{"role": "user", "content": "Count to 10"}
],
"stream": true
}'
```
### JavaScript/React
```javascript
import { api } from './services/api';
// Non-streaming
const response = await api.multiProviderChat({
provider: 'groq',
model: 'llama-3.3-70b-versatile',
messages: [
{ role: 'user', content: 'Hello!' }
],
temperature: 0.7,
max_tokens: 100
});
console.log(response.content);
console.log(`Cost: $${response.cost}`);
// Streaming
for await (const chunk of api.streamMultiProviderChat({
provider: 'openai',
model: 'gpt-4-turbo',
messages: [
{ role: 'user', content: 'Write a story' }
],
temperature: 0.8,
max_tokens: 500,
stream: true
})) {
console.log(chunk);
}
```
---
## Cost Management
### Monthly Budget
Set maximum monthly spend per user:
```python
# In .env
MAX_MONTHLY_COST=100.00
```
When limit reached, API returns 429 error:
```json
{
"detail": "Monthly cost limit of $100 exceeded. Current usage: $102.45"
}
```
### Usage Tracking
Track usage via API:
```bash
curl -X GET http://localhost:8000/api/v1/ai/multi/usage \
-H "Authorization: Bearer YOUR_TOKEN"
```
Response:
```json
{
"total_tokens": 1500000,
"total_cost": 45.67,
"monthly_cost": 12.34,
"monthly_limit": 100.00,
"remaining_budget": 87.66
}
```
### Cost Optimization
**1. Use Cheaper Models:**
- Groq LLaMA 3.1 8B: $0.00005/1K (100x cheaper than GPT-4)
- Together AI Mistral 7B: $0.0002/1K
- OpenAI GPT-3.5 Turbo: $0.0005/1K
**2. Reduce Token Usage:**
```python
# Limit max_tokens
response = await service.chat(
provider="openai",
model="gpt-4",
messages=messages,
max_tokens=200 # Instead of 1000
)
```
**3. Use Ollama for Development:**
- Free and unlimited
- Perfect for testing
- No API costs
**4. Smart Provider Selection:**
```python
def select_provider(task_complexity: str):
if task_complexity == "simple":
return "ollama", "llama2" # Free
elif task_complexity == "medium":
return "groq", "llama-3.1-8b-instant" # Fast + cheap
else:
return "openai", "gpt-4-turbo" # Best quality
```
### Cost Comparison (1M Tokens)
| Provider | Model | Input Cost | Output Cost | Total (50/50) |
|----------|-------|------------|-------------|---------------|
| Ollama | Any | $0 | $0 | **$0** |
| Together AI | Mistral 7B | $0.20 | $0.20 | **$0.20** |
| Groq | LLaMA 3.1 8B | $0.05 | $0.08 | **$0.065** |
| Groq | LLaMA 3.3 70B | $0.59 | $0.79 | **$0.69** |
| OpenAI | GPT-3.5 Turbo | $0.50 | $1.50 | **$1.00** |
| OpenAI | GPT-4o Mini | $0.15 | $0.60 | **$0.375** |
| Anthropic | Claude Haiku | $0.25 | $1.25 | **$0.75** |
| OpenAI | GPT-4 Turbo | $10 | $30 | **$20** |
| Anthropic | Claude Opus | $15 | $75 | **$45** |
---
## Best Practices
### 1. Provider Selection Strategy
```python
# Task-based selection
task_to_provider = {
"simple_qa": ("ollama", "llama2"),
"code_generation": ("groq", "llama-3.3-70b-versatile"),
"creative_writing": ("anthropic", "claude-3-5-sonnet-20241022"),
"complex_reasoning": ("openai", "gpt-4-turbo"),
"fast_responses": ("groq", "llama-3.1-8b-instant"),
}
```
### 2. Error Handling
```python
from app.core.exceptions import AppException
try:
response = await service.chat(
provider="openai",
model="gpt-4",
messages=messages
)
except AppException as e:
if e.status_code == 429:
# Budget exceeded, use free alternative
response = await service.chat(
provider="ollama",
model="llama2",
messages=messages
)
else:
raise
```
### 3. Caching
```python
from functools import lru_cache
@lru_cache(maxsize=1000)
def get_cached_response(prompt: str, provider: str, model: str):
# Cache identical requests
return service.chat(
provider=provider,
model=model,
messages=[ChatMessage(role="user", content=prompt)]
)
```
### 4. Rate Limiting
```python
import asyncio
from collections import defaultdict
class RateLimiter:
def __init__(self):
self.requests = defaultdict(list)
async def check_limit(self, provider: str, limit: int = 10):
now = time.time()
# Clean old requests
self.requests[provider] = [
t for t in self.requests[provider]
if now - t < 60
]
if len(self.requests[provider]) >= limit:
await asyncio.sleep(1)
self.requests[provider].append(now)
```
### 5. Fallback Chain
```python
async def chat_with_fallback(messages):
providers = [
("groq", "llama-3.3-70b-versatile"),
("together", "meta-llama/Llama-3-70b-chat-hf"),
("ollama", "llama2"),
]
for provider, model in providers:
try:
return await service.chat(
provider=provider,
model=model,
messages=messages
)
except Exception as e:
print(f"Failed with {provider}, trying next...")
continue
raise Exception("All providers failed")
```
---
## Troubleshooting
### Common Issues
**1. "No API key configured"**
**Problem:** Provider requires API key but none found.
**Solution:**
```bash
# Add to .env
GROQ_API_KEY=your_key_here
# Or configure via UI
Settings → API Keys → Add Key
```
**2. "Monthly cost limit exceeded"**
**Problem:** User hit monthly budget.
**Solution:**
```python
# Increase limit in .env
MAX_MONTHLY_COST=200.00
# Or use free providers
provider="ollama" # No cost tracking
```
**3. "Rate limit exceeded"**
**Problem:** Too many requests to provider.
**Solution:**
```python
# Add delay between requests
import asyncio
await asyncio.sleep(1)
# Or use different provider
# Groq free tier: 30 req/min
# OpenAI: Much higher limits
```
**4. "Model not found"**
**Problem:** Model ID incorrect or unavailable.
**Solution:**
```bash
# List available models
curl -X GET http://localhost:8000/api/v1/ai/multi/models/groq \
-H "Authorization: Bearer TOKEN"
# Use correct model ID from response
```
**5. "Connection timeout"**
**Problem:** Provider API unreachable.
**Solution:**
```python
# Check provider status
# Groq: https://status.groq.com
# OpenAI: https://status.openai.com
# Anthropic: https://status.anthropic.com
# Use fallback provider
```
### Debugging
**Enable debug logging:**
```python
# In .env
DEBUG=True
LOG_LEVEL=DEBUG
# View logs
tail -f logs/aiverse.log
```
**Test individual provider:**
```bash
python test_multi_provider.py
```
**Check API key validity:**
```bash
# Groq
curl https://api.groq.com/openai/v1/models \
-H "Authorization: Bearer $GROQ_API_KEY"
# OpenAI
curl https://api.openai.com/v1/models \
-H "Authorization: Bearer $OPENAI_API_KEY"
# Anthropic
curl https://api.anthropic.com/v1/messages \
-H "x-api-key: $ANTHROPIC_API_KEY" \
-H "anthropic-version: 2023-06-01"
```
---
## Performance Tips
### 1. Use Groq for Speed
Groq is 5-10x faster than other providers:
- 50-100 tokens/second
- Sub-second latency
- Perfect for real-time apps
### 2. Stream Long Responses
```python
# Non-streaming: wait for full response
response = await service.chat(...) # May take 10+ seconds
# Streaming: instant feedback
async for chunk in service.stream_chat(...):
print(chunk) # Starts immediately
```
### 3. Batch Requests
```python
# Bad: Sequential requests
for prompt in prompts:
await service.chat(messages=[{"role": "user", "content": prompt}])
# Good: Concurrent requests
import asyncio
tasks = [
service.chat(messages=[{"role": "user", "content": p}])
for p in prompts
]
responses = await asyncio.gather(*tasks)
```
### 4. Use Smaller Models
- Groq LLaMA 3.1 8B: 2-3x faster than 70B
- OpenAI GPT-4o Mini: 2x faster than GPT-4
- For simple tasks, speed > quality
---
## Security Best Practices
### 1. API Key Storage
```python
# ✅ Good: Encrypted in database
encrypted_key = api_key_encryption.encrypt(api_key)
# ❌ Bad: Plain text
user.api_key = api_key # Never do this!
```
### 2. Environment Variables
```bash
# ✅ Good: .env file (not committed)
GROQ_API_KEY=gsk_xxxxx
# ❌ Bad: Hardcoded
api_key = "gsk_xxxxx" # Never do this!
```
### 3. User Access Control
```python
# Users can only access their own API keys
if api_key.user_id != current_user.id:
raise HTTPException(403, "Forbidden")
```
### 4. Rate Limiting
```python
# Prevent abuse
@limiter.limit("10/minute")
async def chat_endpoint(...):
...
```
---
## Next Steps
1. **Get API Keys:**
- Start with Together AI ($25 free credits)
- Add Groq for fast inference
- Consider OpenAI/Anthropic for production
2. **Test Integration:**
```bash
python test_multi_provider.py
```
3. **Configure Budget:**
```bash
# In .env
MAX_MONTHLY_COST=50.00
```
4. **Monitor Usage:**
- Check dashboard regularly
- Set up alerts for high usage
- Review cost optimization
5. **Production Deployment:**
- Use system-level API keys
- Enable cost tracking
- Set up monitoring
---
## Support
- **GitHub Issues:** https://github.com/junaidte14/aiverse/issues
- **Documentation:** https://github.com/junaidte14/aiverse
- **API Reference:** http://localhost:8000/docs
---
**Happy AI Integrating! 🚀**
Phase 16: Update Main README 📖
Step 23: Update README.md
Add to README.md:
## 🤖 Multi-Provider AI Support
AIVerse now supports multiple AI providers with a unified interface:
### Supported Providers
| Provider | Cost | Speed | Quality | Free Tier |
|----------|------|-------|---------|-----------|
| **Ollama** | Free | Fast* | Good | ✅ Unlimited |
| **Groq** | Very Low | Very Fast | Good | ✅ 30 req/min |
| **Together AI** | Low | Medium | Good | ✅ $25 credits |
| **OpenAI** | Medium-High | Medium | Excellent | ❌ |
| **Anthropic** | Medium-High | Medium | Excellent | ❌ |
*With GPU
### Quick Start
```python
# Use Groq for fast inference
response = await ai_service.chat(
provider="groq",
model="llama-3.3-70b-versatile",
messages=[{"role": "user", "content": "Hello!"}]
)
# Use Claude for complex reasoning
response = await ai_service.chat(
provider="anthropic",
model="claude-3-5-sonnet-20241022",
messages=[{"role": "user", "content": "Explain quantum mechanics"}]
)
# Use Ollama for free unlimited usage
response = await ai_service.chat(
provider="ollama",
model="llama2",
messages=[{"role": "user", "content": "Write a poem"}]
)
```
### Features
- ✅ **Unified Interface** - Same API for all providers
- ✅ **Cost Tracking** - Monitor spending across providers
- ✅ **Smart Fallbacks** - Automatic failover to alternative providers
- ✅ **User API Keys** - Users can use their own API keys
- ✅ **Budget Limits** - Set monthly spending caps
- ✅ **Usage Dashboard** - Real-time usage statistics
### Get API Keys
1. **Groq:** https://console.groq.com/keys (Free tier available)
2. **OpenAI:** https://platform.openai.com/api-keys ($5 minimum)
3. **Anthropic:** https://console.anthropic.com/settings/keys ($5 minimum)
4. **Together AI:** https://api.together.xyz/settings/api-keys ($25 free credits)
See [MULTI_PROVIDER_GUIDE.md](MULTI_PROVIDER_GUIDE.md) for complete documentation.
Phase 17: Update Requirements 📦
Step 24: Update requirements.txt
Add to requirements.txt:
# Existing dependencies...
# Encryption for API keys
cryptography==41.0.7
# HTTP client (if not already present)
httpx==0.25.2
Phase 18: Environment Configuration ⚙️
Step 25: Update .env.example
Create .env.example:
# Application
APP_NAME=AIVerse Backend
APP_VERSION=1.1.0
ENVIRONMENT=development
DEBUG=True
HOST=0.0.0.0
PORT=8000
# API
API_V1_PREFIX=/api/v1
# Database
DATABASE_URL=postgresql+asyncpg://aiverse_user:aiverse_pass@localhost:5432/aiverse_db
# Redis
REDIS_URL=redis://localhost:6379/0
REDIS_CACHE_ENABLED=True
REDIS_CACHE_TTL=300
# Security
SECRET_KEY=your-secret-key-here-change-in-production
ALGORITHM=HS256
ACCESS_TOKEN_EXPIRE_MINUTES=30
REFRESH_TOKEN_EXPIRE_DAYS=7
# CORS
ALLOWED_ORIGINS=http://localhost:3000,http://localhost:8000
# Ollama (Local LLM)
OLLAMA_BASE_URL=http://localhost:11434
# Multi-Provider AI Keys (Optional - can also be set per-user)
GROQ_API_KEY=
OPENAI_API_KEY=
ANTHROPIC_API_KEY=
TOGETHER_API_KEY=
# AI Configuration
DEFAULT_AI_PROVIDER=ollama
AI_PROVIDER_TIMEOUT=120
AI_MAX_RETRIES=3
# Cost Management
ENABLE_COST_TRACKING=True
MAX_MONTHLY_COST=100.00
# Logging
LOG_LEVEL=INFO
Phase 19: Database Migration 🗄️
Step 26: Create Migration for Multi-Provider Support
# Create migration
alembic revision --autogenerate -m "Add multi-provider AI support"
# Review migration file
# Should include:
# - groq_api_key column
# - openai_api_key column
# - anthropic_api_key column
# - together_api_key column
# - total_tokens_used column
# - total_cost column
# - monthly_cost column
# - last_cost_reset column
# Apply migration
alembic upgrade head
Phase 20: Final Integration Checklist ✅
Step 27: Create Integration Checklist
Create INTEGRATION_CHECKLIST.md:
# Multi-Provider Integration Checklist
## Backend Setup
- [ ] Install dependencies: `pip install -r requirements.txt`
- [ ] Update environment variables in `.env`
- [ ] Run database migrations: `alembic upgrade head`
- [ ] Start Ollama (optional): `ollama serve`
- [ ] Pull Ollama models (optional): `ollama pull llama2`
- [ ] Configure system API keys in `.env` (optional)
- [ ] Start backend: `python main.py`
- [ ] Verify health: `curl http://localhost:8000/api/v1/health`
## Frontend Setup
- [ ] Install dependencies: `cd frontend && npm install`
- [ ] Update API base URL if needed
- [ ] Start frontend: `npm run dev`
- [ ] Access UI: `http://localhost:3000`
## API Key Configuration
### Option 1: System-Level Keys (Shared)
Add to `.env`:
```bash
GROQ_API_KEY=gsk_xxxxx
OPENAI_API_KEY=sk-xxxxx
ANTHROPIC_API_KEY=sk-ant-xxxxx
TOGETHER_API_KEY=xxxxx
```
### Option 2: User-Level Keys (Per User)
Configure via UI:
1. Login to application
2. Navigate to Settings → API Keys
3. Add keys for desired providers
4. Save
## Testing
- [ ] Run test suite: `python test_multi_provider.py`
- [ ] Test each provider:
- [ ] Ollama
- [ ] Groq (if API key configured)
- [ ] OpenAI (if API key configured)
- [ ] Anthropic (if API key configured)
- [ ] Together AI (if API key configured)
- [ ] Test streaming responses
- [ ] Test cost tracking
- [ ] Test API key management
- [ ] Test usage dashboard
## Production Deployment
- [ ] Set strong SECRET_KEY
- [ ] Configure production database
- [ ] Set ENVIRONMENT=production
- [ ] Set DEBUG=False
- [ ] Configure API keys securely
- [ ] Set appropriate MAX_MONTHLY_COST
- [ ] Enable HTTPS
- [ ] Configure CORS for production domains
- [ ] Set up monitoring
- [ ] Configure backup strategy
## Monitoring
- [ ] Set up cost alerts
- [ ] Monitor API usage
- [ ] Track error rates
- [ ] Set up logging aggregation
- [ ] Configure budget notifications
## Documentation
- [ ] Read MULTI_PROVIDER_GUIDE.md
- [ ] Review API documentation at /docs
- [ ] Check example code
- [ ] Review security best practices
## Optional Enhancements
- [ ] Implement caching layer
- [ ] Add rate limiting per provider
- [ ] Set up automatic fallbacks
- [ ] Implement request queuing
- [ ] Add custom provider implementations
- [ ] Configure load balancing
## Verification
Test each scenario:
1. **Free Usage (Ollama):**
```bash
curl -X POST http://localhost:8000/api/v1/ai/multi/chat \
-H "Authorization: Bearer TOKEN" \
-d '{"provider":"ollama","model":"llama2","messages":[{"role":"user","content":"Hello"}]}'
```
2. **Paid Usage (Groq):**
```bash
curl -X POST http://localhost:8000/api/v1/ai/multi/chat \
-H "Authorization: Bearer TOKEN" \
-d '{"provider":"groq","model":"llama-3.3-70b-versatile","messages":[{"role":"user","content":"Hello"}]}'
```
3. **Cost Tracking:**
```bash
curl -X GET http://localhost:8000/api/v1/ai/multi/usage \
-H "Authorization: Bearer TOKEN"
```
4. **API Key Management:**
```bash
curl -X GET http://localhost:8000/api/v1/api-keys/status \
-H "Authorization: Bearer TOKEN"
```
## Troubleshooting
If issues occur:
1. Check logs: `tail -f logs/aiverse.log`
2. Verify database connection
3. Test API keys independently
4. Review error messages
5. Check provider status pages
6. Consult MULTI_PROVIDER_GUIDE.md troubleshooting section
## Success Criteria
✅ All providers configured
✅ Can send requests to each provider
✅ Streaming works correctly
✅ Cost tracking accurate
✅ API key management functional
✅ Usage dashboard showing correct data
✅ Error handling working
✅ Fallbacks configured
✅ Production-ready deployment
---
**Congratulations! Multi-provider AI integration complete! 🎉**
Summary of Multi-Provider Extension 🎯
What We’ve Built:
✅ 5 AI Provider Integrations:
- Ollama (local, free)
- Groq (ultra-fast, cheap)
- OpenAI (GPT models)
- Anthropic (Claude models)
- Together AI (open models, $25 free)
✅ Unified Interface:
- Single API for all providers
- Consistent message format
- Automatic cost tracking
- Smart error handling
✅ User Features:
- API key management UI
- Usage dashboard
- Monthly budget limits
- Cost optimization tools
✅ Developer Features:
- Abstract provider interface
- Factory pattern for providers
- Encrypted API key storage
- Comprehensive testing
✅ Documentation:
- Complete integration guide
- Best practices
- Cost comparison
- Troubleshooting guide
Cost Comparison (1M Tokens):
| Provider | Model | Cost |
|---|---|---|
| Ollama | Any | $0 🎉 |
| Together AI | Mistral 7B | $0.20 |
| Groq | LLaMA 8B | $0.065 |
| Groq | LLaMA 70B | $0.69 |
| OpenAI | GPT-3.5 | $1.00 |
| OpenAI | GPT-4o Mini | $0.375 |
| OpenAI | GPT-4 Turbo | $20 |
| Anthropic | Claude Opus | $45 |
Key Benefits:
- Flexibility – Choose best provider per use case
- Cost Optimization – Use free/cheap providers when possible
- Redundancy – Automatic fallbacks if provider fails
- Scalability – Easy to add new providers
- User Control – Users manage their own API keys
- Transparency – Full cost tracking and usage stats
The AIVerse project is now feature-complete with multi-provider AI support! 🚀
Total Project Features:
- ✅ FastAPI backend with advanced features
- ✅ PostgreSQL database with migrations
- ✅ JWT authentication & RBAC
- ✅ 5 AI provider integrations
- ✅ React TypeScript frontend
- ✅ Docker & Kubernetes deployment
- ✅ CI/CD pipeline
- ✅ Monitoring & logging
- ✅ Cost tracking & management
- ✅ Production-ready architecture
This is a professional, production-grade, full-stack AI application! 🌟
Leave a Reply