Views: 16
To integrate PostgreSQL with FastAPI using SQLAlchemy, install SQLAlchemy and asyncpg for async support, create database models inheriting from SQLAlchemy’s declarative base, set up an async engine and session factory, use Alembic for database migrations, and implement dependency injection to provide database sessions to your endpoints. Replace in-memory storage in your service layer with SQLAlchemy queries, use async/await patterns for all database operations, and implement proper connection pooling. This architecture ensures scalable, production-ready database integration with full ORM capabilities.
🎓 What You’ll Learn
By the end of this tutorial, you’ll be able to:
- Set up PostgreSQL database for FastAPI
- Create SQLAlchemy ORM models with relationships
- Implement async database operations
- Use Alembic for database migrations
- Handle database sessions with dependency injection
- Implement proper connection pooling
- Create database indexes and constraints
- Handle transactions and rollbacks
- Implement database seeding and fixtures
- Deploy database in production
📖 Understanding Database Integration
Why PostgreSQL + SQLAlchemy?
| Technology | Why We Use It |
|---|---|
| PostgreSQL | Production-grade, ACID compliant, supports JSON, full-text search |
| SQLAlchemy | Mature ORM, async support, migrations, relationship handling |
| Alembic | Database migrations, version control for schema changes |
| asyncpg | Fastest PostgreSQL driver for Python async |
Architecture Overview
FastAPI Endpoint
↓
Dependency Injection (get_db)
↓
SQLAlchemy Session
↓
ORM Models (User, Conversation, Message)
↓
PostgreSQL Database
🛠️ Step-by-Step Implementation
Step 1: Install Dependencies
# Activate your virtual environment
cd fastapi
source venv/bin/activate # or venv\Scripts\activate on Windows
# Install database packages
pip install sqlalchemy[asyncio]==2.0.23
pip install asyncpg==0.29.0
pip install alembic==1.13.1
pip install psycopg2-binary==2.9.9 # For sync operations if needed
# Update requirements
pip freeze > requirements.txt
What we installed:
sqlalchemy[asyncio]: ORM with async supportasyncpg: Async PostgreSQL driveralembic: Database migration toolpsycopg2-binary: PostgreSQL adapter (backup/utilities)
Step 2: Install and Setup PostgreSQL
Option A: Local Installation
Windows:
# Download from https://www.postgresql.org/download/windows/
# Or use chocolatey:
choco install postgresql
macOS:
brew install postgresql@15
brew services start postgresql@15
Linux (Ubuntu/Debian):
sudo apt update
sudo apt install postgresql postgresql-contrib
sudo systemctl start postgresql
Option B: Docker (Recommended for Development)
Create docker-compose.yml in project root:
version: '3.8'
services:
postgres:
image: postgres:15-alpine
container_name: aiverse_postgres
environment:
POSTGRES_USER: aiverse_user
POSTGRES_PASSWORD: aiverse_pass
POSTGRES_DB: aiverse_db
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U aiverse_user -d aiverse_db"]
interval: 10s
timeout: 5s
retries: 5
volumes:
postgres_data:
Start PostgreSQL:
docker-compose up -d
Verify it’s running:
docker ps
# You should see aiverse_postgres running
Step 3: Update Configuration
Update app/core/config.py:
"""
Core configuration module
Manages all application settings using Pydantic Settings
"""
from pydantic_settings import BaseSettings, SettingsConfigDict
from typing import List, Optional
from functools import lru_cache
class Settings(BaseSettings):
"""
Application settings
These values are loaded from environment variables or .env file
Pydantic validates the types automatically
"""
# Application
APP_NAME: str = "AIVerse Backend"
APP_VERSION: str = "0.4.0"
DEBUG: bool = False
ENVIRONMENT: str = "production"
# API
API_V1_PREFIX: str = "/api/v1"
HOST: str = "0.0.0.0"
PORT: int = 8000
# CORS
ALLOWED_ORIGINS: str = "http://localhost:3000"
@property
def allowed_origins_list(self) -> List[str]:
"""Convert comma-separated string to list"""
return [origin.strip() for origin in self.ALLOWED_ORIGINS.split(",")]
# Database Configuration
DATABASE_URL: str = "postgresql+asyncpg://aiverse_user:aiverse_pass@localhost:5432/aiverse_db"
DATABASE_ECHO: bool = False # Set to True to see SQL queries in logs
# Database Connection Pool Settings
DB_POOL_SIZE: int = 5
DB_MAX_OVERFLOW: int = 10
DB_POOL_TIMEOUT: int = 30
DB_POOL_RECYCLE: int = 3600 # Recycle connections after 1 hour
@property
def async_database_url(self) -> str:
"""Get async database URL"""
return self.DATABASE_URL
@property
def sync_database_url(self) -> str:
"""Get sync database URL (for Alembic migrations)"""
return self.DATABASE_URL.replace("+asyncpg", "").replace("postgresql+asyncpg", "postgresql")
# AI Settings
OLLAMA_BASE_URL: str = "http://localhost:11434"
DEFAULT_AI_MODEL: str = "llama2"
# Security
SECRET_KEY: str = "change-this-in-production"
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
# Pydantic Settings Configuration
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",
case_sensitive=True,
extra="ignore"
)
@lru_cache()
def get_settings() -> Settings:
"""
Create settings instance (cached)
@lru_cache ensures we only create one Settings instance
and reuse it throughout the application
Returns:
Settings: Application settings
"""
return Settings()
# Convenience: Get settings instance
settings = get_settings()
Update .env:
# Application Configuration
APP_NAME=AIVerse Backend
APP_VERSION=0.4.0
DEBUG=True
ENVIRONMENT=development
# Server Configuration
API_V1_PREFIX=/api/v1
HOST=0.0.0.0
PORT=8000
# CORS Configuration
ALLOWED_ORIGINS=http://localhost:3000,http://localhost:8000
# Database Configuration
DATABASE_URL=postgresql+asyncpg://aiverse_user:aiverse_pass@localhost:5432/aiverse_db
DATABASE_ECHO=True
DB_POOL_SIZE=5
DB_MAX_OVERFLOW=10
DB_POOL_TIMEOUT=30
DB_POOL_RECYCLE=3600
# AI Configuration
OLLAMA_BASE_URL=http://localhost:11434
DEFAULT_AI_MODEL=llama2
# Security Configuration
SECRET_KEY=your-secret-key-change-in-production
ALGORITHM=HS256
ACCESS_TOKEN_EXPIRE_MINUTES=30
Step 4: Create Database Infrastructure
Create app/db/__init__.py:
"""
Database package
Contains database configuration, models, and utilities
"""
from app.db.base import Base
from app.db.session import engine, async_session_maker, get_db
__all__ = [
"Base",
"engine",
"async_session_maker",
"get_db",
]
Create app/db/base.py:
"""
Base class for all database models
All SQLAlchemy models should inherit from Base
"""
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy import MetaData
# Naming convention for constraints
# This ensures consistent naming across the database
NAMING_CONVENTION = {
"ix": "ix_%(column_0_label)s",
"uq": "uq_%(table_name)s_%(column_0_name)s",
"ck": "ck_%(table_name)s_%(constraint_name)s",
"fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s",
"pk": "pk_%(table_name)s"
}
metadata = MetaData(naming_convention=NAMING_CONVENTION)
class Base(DeclarativeBase):
"""
Base class for all database models
All models should inherit from this class
"""
metadata = metadata
Create app/db/session.py:
"""
Database session management
Handles database connection, session creation, and dependency injection
"""
from sqlalchemy.ext.asyncio import (
create_async_engine,
async_sessionmaker,
AsyncSession,
AsyncEngine
)
from typing import AsyncGenerator
from app.core.config import settings
# Create async engine
engine: AsyncEngine = create_async_engine(
settings.async_database_url,
echo=settings.DATABASE_ECHO,
pool_size=settings.DB_POOL_SIZE,
max_overflow=settings.DB_MAX_OVERFLOW,
pool_timeout=settings.DB_POOL_TIMEOUT,
pool_recycle=settings.DB_POOL_RECYCLE,
pool_pre_ping=True, # Verify connections before using
)
# Create async session factory
async_session_maker = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
autocommit=False,
autoflush=False,
)
async def get_db() -> AsyncGenerator[AsyncSession, None]:
"""
Dependency for getting database session
Yields:
AsyncSession: Database session
Usage:
@app.get("/users")
async def get_users(db: AsyncSession = Depends(get_db)):
result = await db.execute(select(User))
return result.scalars().all()
"""
async with async_session_maker() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()
async def init_db() -> None:
"""
Initialize database
Creates all tables if they don't exist
Note: In production, use Alembic migrations instead
"""
from app.db.base import Base
async with engine.begin() as conn:
# Drop all tables (only for development!)
# await conn.run_sync(Base.metadata.drop_all)
# Create all tables
await conn.run_sync(Base.metadata.create_all)
async def close_db() -> None:
"""
Close database connections
Call this on application shutdown
"""
await engine.dispose()
Step 5: Create Database Models
Create app/db/models/__init__.py:
"""
Database models package
Contains all SQLAlchemy ORM models
"""
from app.db.models.user import User
from app.db.models.conversation import Conversation
from app.db.models.message import Message
__all__ = [
"User",
"Conversation",
"Message",
]
Create app/db/models/user.py:
"""
User database model
SQLAlchemy ORM model for users
"""
from sqlalchemy import String, Boolean, DateTime, Enum as SQLEnum
from sqlalchemy.orm import Mapped, mapped_column, relationship
from datetime import datetime
from typing import List
import enum
from app.db.base import Base
class UserRole(str, enum.Enum):
"""User role enumeration"""
ADMIN = "admin"
USER = "user"
GUEST = "guest"
class User(Base):
"""
User model
Represents a user in the system
"""
__tablename__ = "users"
# Primary Key
id: Mapped[int] = mapped_column(primary_key=True, index=True)
# User Information
username: Mapped[str] = mapped_column(
String(50),
unique=True,
index=True,
nullable=False
)
email: Mapped[str] = mapped_column(
String(255),
unique=True,
index=True,
nullable=False
)
hashed_password: Mapped[str] = mapped_column(
String(255),
nullable=False
)
full_name: Mapped[str | None] = mapped_column(
String(100),
nullable=True
)
# Role and Status
role: Mapped[UserRole] = mapped_column(
SQLEnum(UserRole),
default=UserRole.USER,
nullable=False
)
is_active: Mapped[bool] = mapped_column(
Boolean,
default=True,
nullable=False
)
is_verified: Mapped[bool] = mapped_column(
Boolean,
default=False,
nullable=False
)
# Timestamps
created_at: Mapped[datetime] = mapped_column(
DateTime,
default=datetime.utcnow,
nullable=False
)
updated_at: Mapped[datetime] = mapped_column(
DateTime,
default=datetime.utcnow,
onupdate=datetime.utcnow,
nullable=False
)
last_login: Mapped[datetime | None] = mapped_column(
DateTime,
nullable=True
)
# Relationships
conversations: Mapped[List["Conversation"]] = relationship(
"Conversation",
back_populates="user",
cascade="all, delete-orphan"
)
def __repr__(self) -> str:
return f"<User(id={self.id}, username='{self.username}', email='{self.email}')>"
Create app/db/models/conversation.py:
"""
Conversation database model
SQLAlchemy ORM model for AI conversations
"""
from sqlalchemy import String, DateTime, ForeignKey, Text
from sqlalchemy.orm import Mapped, mapped_column, relationship
from datetime import datetime
from typing import List
from app.db.base import Base
class Conversation(Base):
"""
Conversation model
Represents an AI conversation thread
"""
__tablename__ = "conversations"
# Primary Key
id: Mapped[int] = mapped_column(primary_key=True, index=True)
# Foreign Keys
user_id: Mapped[int] = mapped_column(
ForeignKey("users.id", ondelete="CASCADE"),
nullable=False,
index=True
)
# Conversation Information
title: Mapped[str] = mapped_column(
String(255),
nullable=False,
default="New Conversation"
)
model_name: Mapped[str] = mapped_column(
String(50),
nullable=False
)
# Metadata
metadata_json: Mapped[str | None] = mapped_column(
Text,
nullable=True
)
# Timestamps
created_at: Mapped[datetime] = mapped_column(
DateTime,
default=datetime.utcnow,
nullable=False
)
updated_at: Mapped[datetime] = mapped_column(
DateTime,
default=datetime.utcnow,
onupdate=datetime.utcnow,
nullable=False
)
# Relationships
user: Mapped["User"] = relationship(
"User",
back_populates="conversations"
)
messages: Mapped[List["Message"]] = relationship(
"Message",
back_populates="conversation",
cascade="all, delete-orphan",
order_by="Message.created_at"
)
def __repr__(self) -> str:
return f"<Conversation(id={self.id}, title='{self.title}', user_id={self.user_id})>"
Create app/db/models/message.py:
"""
Message database model
SQLAlchemy ORM model for conversation messages
"""
from sqlalchemy import String, DateTime, ForeignKey, Text, Enum as SQLEnum
from sqlalchemy.orm import Mapped, mapped_column, relationship
from datetime import datetime
import enum
from app.db.base import Base
class MessageRole(str, enum.Enum):
"""Message role enumeration"""
USER = "user"
ASSISTANT = "assistant"
SYSTEM = "system"
class Message(Base):
"""
Message model
Represents a single message in a conversation
"""
__tablename__ = "messages"
# Primary Key
id: Mapped[int] = mapped_column(primary_key=True, index=True)
# Foreign Keys
conversation_id: Mapped[int] = mapped_column(
ForeignKey("conversations.id", ondelete="CASCADE"),
nullable=False,
index=True
)
# Message Information
role: Mapped[MessageRole] = mapped_column(
SQLEnum(MessageRole),
nullable=False
)
content: Mapped[str] = mapped_column(
Text,
nullable=False
)
# Token Information (optional)
prompt_tokens: Mapped[int | None] = mapped_column(nullable=True)
completion_tokens: Mapped[int | None] = mapped_column(nullable=True)
total_tokens: Mapped[int | None] = mapped_column(nullable=True)
# Timestamps
created_at: Mapped[datetime] = mapped_column(
DateTime,
default=datetime.utcnow,
nullable=False,
index=True
)
# Relationships
conversation: Mapped["Conversation"] = relationship(
"Conversation",
back_populates="messages"
)
def __repr__(self) -> str:
content_preview = self.content[:50] + "..." if len(self.content) > 50 else self.content
return f"<Message(id={self.id}, role='{self.role}', content='{content_preview}')>"
Step 6: Setup Alembic for Migrations
Initialize Alembic:
# From fastapi/ directory
alembic init alembic
This creates:
fastapi/
├── alembic/
│ ├── versions/ # Migration files
│ ├── env.py # Alembic environment
│ ├── script.py.mako
│ └── README
└── alembic.ini # Alembic configuration
Update alembic.ini:
# alembic.ini
[alembic]
script_location = alembic
prepend_sys_path = .
version_path_separator = os
# Disable this - we'll set it programmatically
# sqlalchemy.url = postgresql://user:pass@localhost/dbname
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S
Update alembic/env.py:
"""
Alembic environment configuration
Handles database migrations
"""
from logging.config import fileConfig
from sqlalchemy import pool
from sqlalchemy.engine import Connection
from sqlalchemy.ext.asyncio import async_engine_from_config
from alembic import context
import asyncio
# Import your models and config
from app.core.config import settings
from app.db.base import Base
# Import all models so Alembic can detect them
from app.db.models import User, Conversation, Message
# this is the Alembic Config object
config = context.config
# Set database URL from settings
config.set_main_option("sqlalchemy.url", settings.sync_database_url)
# Interpret the config file for Python logging
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# Set target metadata for autogenerate
target_metadata = Base.metadata
def run_migrations_offline() -> None:
"""
Run migrations in 'offline' mode
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
compare_type=True,
)
with context.begin_transaction():
context.run_migrations()
def do_run_migrations(connection: Connection) -> None:
"""Run migrations with provided connection"""
context.configure(
connection=connection,
target_metadata=target_metadata,
compare_type=True,
)
with context.begin_transaction():
context.run_migrations()
async def run_async_migrations() -> None:
"""Run migrations in 'online' mode with async engine"""
connectable = async_engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()
def run_migrations_online() -> None:
"""
Run migrations in 'online' mode
In this scenario we need to create an Engine
and associate a connection with the context.
"""
asyncio.run(run_async_migrations())
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
Step 7: Create Initial Migration
# Create initial migration
alembic revision --autogenerate -m "Initial migration: users, conversations, messages"
# Apply migration
alembic upgrade head
You should see:
INFO [alembic.runtime.migration] Running upgrade -> abc123, Initial migration: users, conversations, messages
Verify tables were created:
# Using Docker
docker exec -it aiverse_postgres psql -U aiverse_user -d aiverse_db -c "\dt"
# Should show:
# public | alembic_version | table | aiverse_user
# public | conversations | table | aiverse_user
# public | messages | table | aiverse_user
# public | users | table | aiverse_user
Or confirm through SQL Shell -> login with credntials -> Run the command \dt
Step 8: Create Repository Pattern
The repository pattern provides an abstraction layer between the business logic and data access layer.
Create app/db/repositories/__init__.py:
"""
Repository package
Contains data access layer implementations
"""
from app.db.repositories.user_repository import UserRepository
from app.db.repositories.conversation_repository import ConversationRepository
from app.db.repositories.message_repository import MessageRepository
__all__ = [
"UserRepository",
"ConversationRepository",
"MessageRepository",
]
Create app/db/repositories/base_repository.py:
"""
Base repository with common CRUD operations
All repositories should inherit from this base class
"""
from typing import Generic, TypeVar, Type, List, Optional, Any
from sqlalchemy import select, update, delete
from sqlalchemy.ext.asyncio import AsyncSession
from app.db.base import Base
ModelType = TypeVar("ModelType", bound=Base)
class BaseRepository(Generic[ModelType]):
"""
Base repository with common database operations
Provides CRUD operations for any SQLAlchemy model
"""
def __init__(self, model: Type[ModelType], session: AsyncSession):
"""
Initialize repository
Args:
model: SQLAlchemy model class
session: Database session
"""
self.model = model
self.session = session
async def create(self, **kwargs) -> ModelType:
"""
Create a new record
Args:
**kwargs: Model attributes
Returns:
Created model instance
"""
instance = self.model(**kwargs)
self.session.add(instance)
await self.session.flush()
await self.session.refresh(instance)
return instance
async def get_by_id(self, id: int) -> Optional[ModelType]:
"""
Get record by ID
Args:
id: Record ID
Returns:
Model instance if found, None otherwise
"""
result = await self.session.execute(
select(self.model).where(self.model.id == id)
)
return result.scalar_one_or_none()
async def get_all(
self,
skip: int = 0,
limit: int = 100,
**filters
) -> List[ModelType]:
"""
Get all records with optional filtering
Args:
skip: Number of records to skip
limit: Maximum number of records to return
**filters: Field filters
Returns:
List of model instances
"""
query = select(self.model)
# Apply filters
for field, value in filters.items():
if hasattr(self.model, field) and value is not None:
query = query.where(getattr(self.model, field) == value)
query = query.offset(skip).limit(limit)
result = await self.session.execute(query)
return list(result.scalars().all())
async def update(self, id: int, **kwargs) -> Optional[ModelType]:
"""
Update record by ID
Args:
id: Record ID
**kwargs: Fields to update
Returns:
Updated model instance if found, None otherwise
"""
# Remove None values
update_data = {k: v for k, v in kwargs.items() if v is not None}
if not update_data:
return await self.get_by_id(id)
await self.session.execute(
update(self.model)
.where(self.model.id == id)
.values(**update_data)
)
await self.session.flush()
return await self.get_by_id(id)
async def delete(self, id: int) -> bool:
"""
Delete record by ID
Args:
id: Record ID
Returns:
True if deleted, False if not found
"""
result = await self.session.execute(
delete(self.model).where(self.model.id == id)
)
await self.session.flush()
return result.rowcount > 0
async def count(self, **filters) -> int:
"""
Count records with optional filtering
Args:
**filters: Field filters
Returns:
Number of records
"""
from sqlalchemy import func
query = select(func.count(self.model.id))
# Apply filters
for field, value in filters.items():
if hasattr(self.model, field) and value is not None:
query = query.where(getattr(self.model, field) == value)
result = await self.session.execute(query)
return result.scalar_one()
async def exists(self, **filters) -> bool:
"""
Check if record exists
Args:
**filters: Field filters
Returns:
True if exists, False otherwise
"""
count = await self.count(**filters)
return count > 0
Create app/db/repositories/user_repository.py:
"""
User repository
Data access layer for User model
"""
from typing import Optional
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.db.repositories.base_repository import BaseRepository
from app.db.models.user import User, UserRole
class UserRepository(BaseRepository[User]):
"""
User repository
Handles all database operations for users
"""
def __init__(self, session: AsyncSession):
super().__init__(User, session)
async def get_by_username(self, username: str) -> Optional[User]:
"""
Get user by username
Args:
username: Username to search for
Returns:
User if found, None otherwise
"""
result = await self.session.execute(
select(User).where(User.username == username)
)
return result.scalar_one_or_none()
async def get_by_email(self, email: str) -> Optional[User]:
"""
Get user by email
Args:
email: Email to search for
Returns:
User if found, None otherwise
"""
result = await self.session.execute(
select(User).where(User.email == email)
)
return result.scalar_one_or_none()
async def get_by_role(
self,
role: UserRole,
skip: int = 0,
limit: int = 100
) -> list[User]:
"""
Get users by role
Args:
role: User role to filter by
skip: Number of records to skip
limit: Maximum number of records
Returns:
List of users with specified role
"""
result = await self.session.execute(
select(User)
.where(User.role == role)
.offset(skip)
.limit(limit)
)
return list(result.scalars().all())
async def get_active_users(
self,
skip: int = 0,
limit: int = 100
) -> list[User]:
"""
Get active users
Args:
skip: Number of records to skip
limit: Maximum number of records
Returns:
List of active users
"""
result = await self.session.execute(
select(User)
.where(User.is_active == True)
.offset(skip)
.limit(limit)
)
return list(result.scalars().all())
async def update_last_login(self, user_id: int) -> Optional[User]:
"""
Update user's last login timestamp
Args:
user_id: User ID
Returns:
Updated user
"""
from datetime import datetime
return await self.update(user_id, last_login=datetime.utcnow())
Create app/db/repositories/conversation_repository.py:
"""
Conversation repository
Data access layer for Conversation model
"""
from typing import Optional
from sqlalchemy import select
from sqlalchemy.orm import selectinload
from sqlalchemy.ext.asyncio import AsyncSession
from app.db.repositories.base_repository import BaseRepository
from app.db.models.conversation import Conversation
class ConversationRepository(BaseRepository[Conversation]):
"""
Conversation repository
Handles all database operations for conversations
"""
def __init__(self, session: AsyncSession):
super().__init__(Conversation, session)
async def get_by_user(
self,
user_id: int,
skip: int = 0,
limit: int = 100
) -> list[Conversation]:
"""
Get conversations by user ID
Args:
user_id: User ID
skip: Number of records to skip
limit: Maximum number of records
Returns:
List of user's conversations
"""
result = await self.session.execute(
select(Conversation)
.where(Conversation.user_id == user_id)
.order_by(Conversation.updated_at.desc())
.offset(skip)
.limit(limit)
)
return list(result.scalars().all())
async def get_with_messages(self, conversation_id: int) -> Optional[Conversation]:
"""
Get conversation with all messages loaded
Args:
conversation_id: Conversation ID
Returns:
Conversation with messages if found, None otherwise
"""
result = await self.session.execute(
select(Conversation)
.options(selectinload(Conversation.messages))
.where(Conversation.id == conversation_id)
)
return result.scalar_one_or_none()
async def get_user_conversation_count(self, user_id: int) -> int:
"""
Get number of conversations for a user
Args:
user_id: User ID
Returns:
Number of conversations
"""
return await self.count(user_id=user_id)
Create app/db/repositories/message_repository.py:
"""
Message repository
Data access layer for Message model
"""
from typing import Optional
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from app.db.repositories.base_repository import BaseRepository
from app.db.models.message import Message, MessageRole
class MessageRepository(BaseRepository[Message]):
"""
Message repository
Handles all database operations for messages
"""
def __init__(self, session: AsyncSession):
super().__init__(Message, session)
async def get_by_conversation(
self,
conversation_id: int,
skip: int = 0,
limit: int = 100
) -> list[Message]:
"""
Get messages for a conversation
Args:
conversation_id: Conversation ID
skip: Number of records to skip
limit: Maximum number of records
Returns:
List of messages ordered by creation time
"""
result = await self.session.execute(
select(Message)
.where(Message.conversation_id == conversation_id)
.order_by(Message.created_at.asc())
.offset(skip)
.limit(limit)
)
return list(result.scalars().all())
async def get_last_message(
self,
conversation_id: int
) -> Optional[Message]:
"""
Get the last message in a conversation
Args:
conversation_id: Conversation ID
Returns:
Last message if found, None otherwise
"""
result = await self.session.execute(
select(Message)
.where(Message.conversation_id == conversation_id)
.order_by(Message.created_at.desc())
.limit(1)
)
return result.scalar_one_or_none()
async def count_by_role(
self,
conversation_id: int,
role: MessageRole
) -> int:
"""
Count messages by role in a conversation
Args:
conversation_id: Conversation ID
role: Message role
Returns:
Number of messages with specified role
"""
return await self.count(
conversation_id=conversation_id,
role=role
)
Step 9: Update Services to Use Database
Update app/services/user_service.py:
"""
User service - Business logic for user operations
Updated to use database instead of in-memory storage
"""
from typing import Optional
from sqlalchemy.ext.asyncio import AsyncSession
from passlib.context import CryptContext
from app.db.repositories.user_repository import UserRepository
from app.db.models.user import User, UserRole
from app.models.user import UserCreate, UserUpdate
from app.core.exceptions import UserNotFoundException, UserAlreadyExistsException
# Password hashing
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
class UserService:
"""
User service class
Handles all user-related business logic with database operations
"""
def __init__(self, db: AsyncSession):
"""
Initialize service with database session
Args:
db: Database session
"""
self.repository = UserRepository(db)
def _hash_password(self, password: str) -> str:
"""
Hash password using bcrypt
Args:
password: Plain text password
Returns:
Hashed password
"""
return pwd_context.hash(password)
def _verify_password(self, plain_password: str, hashed_password: str) -> bool:
"""
Verify password against hash
Args:
plain_password: Plain text password
hashed_password: Hashed password
Returns:
True if password matches, False otherwise
"""
return pwd_context.verify(plain_password, hashed_password)
async def create_user(self, user_data: UserCreate) -> User:
"""
Create a new user
Args:
user_data: User creation data
Returns:
Created user
Raises:
UserAlreadyExistsException: If username or email already exists
"""
# Check for duplicate username
existing_user = await self.repository.get_by_username(user_data.username)
if existing_user:
raise UserAlreadyExistsException("username", user_data.username)
# Check for duplicate email
existing_email = await self.repository.get_by_email(user_data.email)
if existing_email:
raise UserAlreadyExistsException("email", user_data.email)
# Hash password
hashed_password = self._hash_password(user_data.password)
# Create user
user = await self.repository.create(
username=user_data.username,
email=user_data.email,
hashed_password=hashed_password,
full_name=user_data.full_name,
role=user_data.role,
is_active=True,
is_verified=False
)
return user
async def get_all_users(
self,
skip: int = 0,
limit: int = 10,
role: Optional[UserRole] = None
) -> list[User]:
"""
Get all users with optional filtering
Args:
skip: Number of records to skip
limit: Maximum number of records
role: Optional role filter
Returns:
List of users
"""
if role:
return await self.repository.get_by_role(role, skip, limit)
return await self.repository.get_all(skip, limit)
async def get_user_by_id(self, user_id: int) -> User:
"""
Get user by ID
Args:
user_id: User identifier
Returns:
User object
Raises:
UserNotFoundException: If user not found
"""
user = await self.repository.get_by_id(user_id)
if not user:
raise UserNotFoundException(user_id)
return user
async def get_user_by_username(self, username: str) -> Optional[User]:
"""
Get user by username
Args:
username: Username
Returns:
User if found, None otherwise
"""
return await self.repository.get_by_username(username)
async def get_user_by_email(self, email: str) -> Optional[User]:
"""
Get user by email
Args:
email: Email address
Returns:
User if found, None otherwise
"""
return await self.repository.get_by_email(email)
async def update_user(self, user_id: int, user_update: UserUpdate) -> User:
"""
Update user information
Args:
user_id: User ID
user_update: Update data
Returns:
Updated user
Raises:
UserNotFoundException: If user not found
UserAlreadyExistsException: If username/email conflict
"""
# Check if user exists
user = await self.get_user_by_id(user_id)
update_data = user_update.model_dump(exclude_unset=True)
# Check for username conflict
if "username" in update_data:
existing = await self.repository.get_by_username(update_data["username"])
if existing and existing.id != user_id:
raise UserAlreadyExistsException("username", update_data["username"])
# Check for email conflict
if "email" in update_data:
existing = await self.repository.get_by_email(update_data["email"])
if existing and existing.id != user_id:
raise UserAlreadyExistsException("email", update_data["email"])
# Update user
updated_user = await self.repository.update(user_id, **update_data)
if not updated_user:
raise UserNotFoundException(user_id)
return updated_user
async def delete_user(self, user_id: int) -> None:
"""
Delete a user
Args:
user_id: User ID
Raises:
UserNotFoundException: If user not found
"""
deleted = await self.repository.delete(user_id)
if not deleted:
raise UserNotFoundException(user_id)
async def get_user_count(self) -> int:
"""
Get total number of users
Returns:
User count
"""
return await self.repository.count()
async def authenticate_user(
self,
username: str,
password: str
) -> Optional[User]:
"""
Authenticate user with username and password
Args:
username: Username
password: Plain text password
Returns:
User if authentication successful, None otherwise
"""
user = await self.repository.get_by_username(username)
if not user:
return None
if not self._verify_password(password, user.hashed_password):
return None
# Update last login
await self.repository.update_last_login(user.id)
return user
Create app/services/conversation_service.py:
"""
Conversation service
Business logic for conversation operations
"""
from typing import Optional
from sqlalchemy.ext.asyncio import AsyncSession
from app.db.repositories.conversation_repository import ConversationRepository
from app.db.repositories.message_repository import MessageRepository
from app.db.models.conversation import Conversation
from app.db.models.message import Message, MessageRole
class ConversationService:
"""
Conversation service
Handles conversation and message business logic
"""
def __init__(self, db: AsyncSession):
"""
Initialize service with database session
Args:
db: Database session
"""
self.conversation_repo = ConversationRepository(db)
self.message_repo = MessageRepository(db)
async def create_conversation(
self,
user_id: int,
title: str,
model_name: str
) -> Conversation:
"""
Create a new conversation
Args:
user_id: User ID
title: Conversation title
model_name: AI model name
Returns:
Created conversation
"""
conversation = await self.conversation_repo.create(
user_id=user_id,
title=title,
model_name=model_name
)
return conversation
async def get_conversation(
self,
conversation_id: int,
include_messages: bool = False
) -> Optional[Conversation]:
"""
Get conversation by ID
Args:
conversation_id: Conversation ID
include_messages: Whether to load messages
Returns:
Conversation if found, None otherwise
"""
if include_messages:
return await self.conversation_repo.get_with_messages(conversation_id)
return await self.conversation_repo.get_by_id(conversation_id)
async def get_user_conversations(
self,
user_id: int,
skip: int = 0,
limit: int = 20
) -> list[Conversation]:
"""
Get all conversations for a user
Args:
user_id: User ID
skip: Number to skip
limit: Maximum number
Returns:
List of conversations
"""
return await self.conversation_repo.get_by_user(user_id, skip, limit)
async def add_message(
self,
conversation_id: int,
role: MessageRole,
content: str,
prompt_tokens: Optional[int] = None,
completion_tokens: Optional[int] = None,
total_tokens: Optional[int] = None
) -> Message:
"""
Add message to conversation
Args:
conversation_id: Conversation ID
role: Message role
content: Message content
prompt_tokens: Prompt tokens used
completion_tokens: Completion tokens used
total_tokens: Total tokens used
Returns:
Created message
"""
message = await self.message_repo.create(
conversation_id=conversation_id,
role=role,
content=content,
prompt_tokens=prompt_tokens,
completion_tokens=completion_tokens,
total_tokens=total_tokens
)
return message
async def get_conversation_messages(
self,
conversation_id: int,
skip: int = 0,
limit: int = 100
) -> list[Message]:
"""
Get messages for a conversation
Args:
conversation_id: Conversation ID
skip: Number to skip
limit: Maximum number
Returns:
List of messages
"""
return await self.message_repo.get_by_conversation(
conversation_id,
skip,
limit
)
async def delete_conversation(self, conversation_id: int) -> bool:
"""
Delete a conversation
Args:
conversation_id: Conversation ID
Returns:
True if deleted, False if not found
"""
return await self.conversation_repo.delete(conversation_id)
Step 10: Update API Endpoints
Update app/api/v1/endpoints/users.py:
"""
User management endpoints
Updated to use database
"""
from fastapi import APIRouter, Depends, status
from sqlalchemy.ext.asyncio import AsyncSession
from typing import Annotated
from app.models.user import User, UserCreate, UserUpdate, UserResponse, UserRole
from app.services.user_service import UserService
from app.db.session import get_db
router = APIRouter(prefix="/users", tags=["Users"])
def get_user_service(db: AsyncSession = Depends(get_db)) -> UserService:
"""
Dependency to get user service with database session
Args:
db: Database session
Returns:
UserService instance
"""
return UserService(db)
@router.post(
"",
response_model=UserResponse,
status_code=status.HTTP_201_CREATED,
summary="Create a new user"
)
async def create_user(
user_data: UserCreate,
service: Annotated[UserService, Depends(get_user_service)]
):
"""
Create a new user with the following information:
- **username**: Unique username (3-50 characters)
- **email**: Valid email address
- **full_name**: Optional full name
- **password**: Password (min 8 characters)
- **role**: User role (admin, user, guest)
"""
user = await service.create_user(user_data)
return UserResponse.model_validate(user)
@router.get(
"",
response_model=list[UserResponse],
summary="Get all users"
)
async def get_users(
skip: int = 0,
limit: int = 10,
role: UserRole | None = None,
service: Annotated[UserService, Depends(get_user_service)] = None
):
"""
Retrieve users with optional filtering:
- **skip**: Number of records to skip (for pagination)
- **limit**: Maximum number of records to return
- **role**: Filter by user role
"""
users = await service.get_all_users(skip=skip, limit=limit, role=role)
return [UserResponse.model_validate(user) for user in users]
@router.get(
"/{user_id}",
response_model=UserResponse,
summary="Get user by ID"
)
async def get_user(
user_id: int,
service: Annotated[UserService, Depends(get_user_service)]
):
"""Get a specific user by their ID"""
user = await service.get_user_by_id(user_id)
return UserResponse.model_validate(user)
@router.patch(
"/{user_id}",
response_model=UserResponse,
summary="Update user"
)
async def update_user(
user_id: int,
user_update: UserUpdate,
service: Annotated[UserService, Depends(get_user_service)]
):
"""
Update user information (partial update)
Only provided fields will be updated
"""
user = await service.update_user(user_id, user_update)
return UserResponse.model_validate(user)
@router.delete(
"/{user_id}",
status_code=status.HTTP_204_NO_CONTENT,
summary="Delete user"
)
async def delete_user(
user_id: int,
service: Annotated[UserService, Depends(get_user_service)]
):
"""Delete a user by ID"""
await service.delete_user(user_id)
return None
@router.get(
"/stats/count",
summary="Get user statistics"
)
async def get_user_stats(
service: Annotated[UserService, Depends(get_user_service)]
):
"""Get user statistics"""
total = await service.get_user_count()
return {"total_users": total}
Step 11: Create Conversation Endpoints
Create app/api/v1/endpoints/conversations.py:
"""
Conversation management endpoints
Handles conversation CRUD operations
"""
from fastapi import APIRouter, Depends, status, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from typing import Annotated
from pydantic import BaseModel
from app.services.conversation_service import ConversationService
from app.db.session import get_db
from app.db.models.message import MessageRole
router = APIRouter(prefix="/conversations", tags=["Conversations"])
# ============================================
# PYDANTIC MODELS
# ============================================
class ConversationCreate(BaseModel):
"""Request model for creating conversation"""
user_id: int
title: str = "New Conversation"
model_name: str = "llama2"
class MessageCreate(BaseModel):
"""Request model for creating message"""
role: MessageRole
content: str
prompt_tokens: int | None = None
completion_tokens: int | None = None
total_tokens: int | None = None
class MessageResponse(BaseModel):
"""Response model for message"""
id: int
conversation_id: int
role: str
content: str
prompt_tokens: int | None
completion_tokens: int | None
total_tokens: int | None
created_at: str
model_config = {"from_attributes": True}
class ConversationResponse(BaseModel):
"""Response model for conversation"""
id: int
user_id: int
title: str
model_name: str
created_at: str
updated_at: str
model_config = {"from_attributes": True}
class ConversationWithMessages(ConversationResponse):
"""Response model for conversation with messages"""
messages: list[MessageResponse] = []
# ============================================
# DEPENDENCIES
# ============================================
def get_conversation_service(
db: AsyncSession = Depends(get_db)
) -> ConversationService:
"""Get conversation service with database session"""
return ConversationService(db)
# ============================================
# ENDPOINTS
# ============================================
@router.post(
"",
response_model=ConversationResponse,
status_code=status.HTTP_201_CREATED,
summary="Create new conversation"
)
async def create_conversation(
conversation_data: ConversationCreate,
service: Annotated[ConversationService, Depends(get_conversation_service)]
):
"""
Create a new conversation
- **user_id**: User ID who owns the conversation
- **title**: Conversation title
- **model_name**: AI model to use
"""
conversation = await service.create_conversation(
user_id=conversation_data.user_id,
title=conversation_data.title,
model_name=conversation_data.model_name
)
return ConversationResponse.model_validate(conversation)
@router.get(
"/{conversation_id}",
response_model=ConversationWithMessages,
summary="Get conversation with messages"
)
async def get_conversation(
conversation_id: int,
service: Annotated[ConversationService, Depends(get_conversation_service)]
):
"""
Get conversation by ID with all messages
Returns conversation details and message history
"""
conversation = await service.get_conversation(
conversation_id,
include_messages=True
)
if not conversation:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Conversation {conversation_id} not found"
)
return ConversationWithMessages.model_validate(conversation)
@router.get(
"/user/{user_id}",
response_model=list[ConversationResponse],
summary="Get user conversations"
)
async def get_user_conversations(
user_id: int,
skip: int = 0,
limit: int = 20,
service: Annotated[ConversationService, Depends(get_conversation_service)]
):
"""
Get all conversations for a user
- **user_id**: User ID
- **skip**: Number of records to skip
- **limit**: Maximum number of records
"""
conversations = await service.get_user_conversations(user_id, skip, limit)
return [ConversationResponse.model_validate(c) for c in conversations]
@router.post(
"/{conversation_id}/messages",
response_model=MessageResponse,
status_code=status.HTTP_201_CREATED,
summary="Add message to conversation"
)
async def add_message(
conversation_id: int,
message_data: MessageCreate,
service: Annotated[ConversationService, Depends(get_conversation_service)]
):
"""
Add a message to a conversation
- **role**: Message role (user, assistant, system)
- **content**: Message content
- **tokens**: Optional token counts
"""
message = await service.add_message(
conversation_id=conversation_id,
role=message_data.role,
content=message_data.content,
prompt_tokens=message_data.prompt_tokens,
completion_tokens=message_data.completion_tokens,
total_tokens=message_data.total_tokens
)
return MessageResponse.model_validate(message)
@router.get(
"/{conversation_id}/messages",
response_model=list[MessageResponse],
summary="Get conversation messages"
)
async def get_messages(
conversation_id: int,
skip: int = 0,
limit: int = 100,
service: Annotated[ConversationService, Depends(get_conversation_service)]
):
"""
Get messages for a conversation
Returns messages in chronological order
"""
messages = await service.get_conversation_messages(
conversation_id,
skip,
limit
)
return [MessageResponse.model_validate(m) for m in messages]
@router.delete(
"/{conversation_id}",
status_code=status.HTTP_204_NO_CONTENT,
summary="Delete conversation"
)
async def delete_conversation(
conversation_id: int,
service: Annotated[ConversationService, Depends(get_conversation_service)]
):
"""
Delete a conversation
This will also delete all associated messages
"""
deleted = await service.delete_conversation(conversation_id)
if not deleted:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"Conversation {conversation_id} not found"
)
return None
Update app/api/v1/api.py to include the new router:
"""
API v1 router aggregator
Combines all v1 endpoint routers
"""
from fastapi import APIRouter
from app.api.v1.endpoints import (
users,
health,
users_advanced,
dependencies_demo,
conversations # New!
)
# Create main v1 router
api_router = APIRouter()
# Include all endpoint routers
api_router.include_router(health.router)
api_router.include_router(users.router)
api_router.include_router(users_advanced.router)
api_router.include_router(dependencies_demo.router)
api_router.include_router(conversations.router) # New!
# Future routers:
# api_router.include_router(ai.router)
Step 12: Add Password Hashing Dependency
Install passlib for password hashing:
pip install passlib[bcrypt]==1.7.4
pip freeze > requirements.txt
Step 13: Create Database Utilities
Create app/db/utils.py:
"""
Database utilities
Helper functions for database operations
"""
from typing import AsyncGenerator
from sqlalchemy.ext.asyncio import AsyncSession, AsyncEngine
from contextlib import asynccontextmanager
from app.db.session import async_session_maker, engine
from app.db.base import Base
from app.core.config import settings
@asynccontextmanager
async def get_db_context() -> AsyncGenerator[AsyncSession, None]:
"""
Context manager for database session
Usage:
async with get_db_context() as db:
user = await db.execute(select(User))
"""
async with async_session_maker() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()
async def init_database() -> None:
"""
Initialize database
Creates all tables if they don't exist
WARNING: Only use in development. Use Alembic migrations in production.
"""
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
print("✅ Database initialized")
async def drop_database() -> None:
"""
Drop all database tables
WARNING: This will delete all data!
Only use in development/testing.
"""
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
print("🗑️ Database dropped")
async def reset_database() -> None:
"""
Reset database (drop and recreate)
WARNING: This will delete all data!
Only use in development/testing.
"""
await drop_database()
await init_database()
print("🔄 Database reset complete")
async def check_database_connection() -> bool:
"""
Check if database connection is working
Returns:
True if connection successful, False otherwise
"""
try:
async with engine.connect() as conn:
await conn.execute("SELECT 1")
return True
except Exception as e:
print(f"❌ Database connection failed: {e}")
return False
async def get_database_info() -> dict:
"""
Get database information
Returns:
Dictionary with database info
"""
from sqlalchemy import text
info = {
"url": settings.DATABASE_URL.replace(settings.SECRET_KEY, "***"),
"pool_size": settings.DB_POOL_SIZE,
"max_overflow": settings.DB_MAX_OVERFLOW,
}
try:
async with engine.connect() as conn:
result = await conn.execute(text("SELECT version()"))
version = result.scalar()
info["version"] = version
info["connected"] = True
except Exception as e:
info["connected"] = False
info["error"] = str(e)
return info
Create app/db/seed.py:
"""
Database seeding
Populate database with initial/test data
"""
from sqlalchemy.ext.asyncio import AsyncSession
from passlib.context import CryptContext
from app.db.session import async_session_maker
from app.db.models.user import User, UserRole
from app.db.models.conversation import Conversation
from app.db.models.message import Message, MessageRole
from app.utils.logger import logger
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
async def seed_users(session: AsyncSession) -> list[User]:
"""
Seed users
Creates demo users for development/testing
"""
users = [
User(
username="admin",
email="admin@example.com",
hashed_password=pwd_context.hash("admin123"),
full_name="Admin User",
role=UserRole.ADMIN,
is_active=True,
is_verified=True
),
User(
username="john_doe",
email="john@example.com",
hashed_password=pwd_context.hash("password123"),
full_name="John Doe",
role=UserRole.USER,
is_active=True,
is_verified=True
),
User(
username="jane_smith",
email="jane@example.com",
hashed_password=pwd_context.hash("password123"),
full_name="Jane Smith",
role=UserRole.USER,
is_active=True,
is_verified=True
),
]
session.add_all(users)
await session.flush()
logger.info(f"Seeded {len(users)} users")
return users
async def seed_conversations(
session: AsyncSession,
users: list[User]
) -> list[Conversation]:
"""
Seed conversations
Creates demo conversations for users
"""
conversations = [
Conversation(
user_id=users[1].id, # john_doe
title="Getting Started with FastAPI",
model_name="llama2"
),
Conversation(
user_id=users[1].id, # john_doe
title="Python Best Practices",
model_name="llama2"
),
Conversation(
user_id=users[2].id, # jane_smith
title="Database Design Questions",
model_name="mistral"
),
]
session.add_all(conversations)
await session.flush()
logger.info(f"Seeded {len(conversations)} conversations")
return conversations
async def seed_messages(
session: AsyncSession,
conversations: list[Conversation]
) -> list[Message]:
"""
Seed messages
Creates demo messages for conversations
"""
messages = [
# Conversation 1 messages
Message(
conversation_id=conversations[0].id,
role=MessageRole.USER,
content="What is FastAPI and why should I use it?"
),
Message(
conversation_id=conversations[0].id,
role=MessageRole.ASSISTANT,
content="FastAPI is a modern, fast web framework for building APIs with Python. It's built on Starlette and Pydantic, offering automatic API documentation, data validation, and async support out of the box."
),
Message(
conversation_id=conversations[0].id,
role=MessageRole.USER,
content="How do I create my first endpoint?"
),
Message(
conversation_id=conversations[0].id,
role=MessageRole.ASSISTANT,
content="Here's a simple example:\n\n```python\nfrom fastapi import FastAPI\n\napp = FastAPI()\n\n@app.get('/')\nasync def root():\n return {'message': 'Hello World'}\n```"
),
# Conversation 2 messages
Message(
conversation_id=conversations[1].id,
role=MessageRole.USER,
content="What are some Python best practices?"
),
Message(
conversation_id=conversations[1].id,
role=MessageRole.ASSISTANT,
content="Key Python best practices include: use type hints, follow PEP 8, write docstrings, use virtual environments, handle exceptions properly, and write tests."
),
# Conversation 3 messages
Message(
conversation_id=conversations[2].id,
role=MessageRole.USER,
content="Should I use ORM or raw SQL?"
),
Message(
conversation_id=conversations[2].id,
role=MessageRole.ASSISTANT,
content="Both have their place. ORMs like SQLAlchemy provide abstraction, type safety, and easier migrations. Raw SQL offers more control and can be more performant for complex queries. For most applications, start with an ORM."
),
]
session.add_all(messages)
await session.flush()
logger.info(f"Seeded {len(messages)} messages")
return messages
async def seed_database() -> None:
"""
Seed database with all demo data
Creates users, conversations, and messages
"""
async with async_session_maker() as session:
try:
# Seed in order (users first, then conversations, then messages)
users = await seed_users(session)
conversations = await seed_conversations(session, users)
messages = await seed_messages(session, conversations)
await session.commit()
logger.info("✅ Database seeding complete")
print("✅ Database seeded successfully!")
print(f" - {len(users)} users")
print(f" - {len(conversations)} conversations")
print(f" - {len(messages)} messages")
except Exception as e:
await session.rollback()
logger.error(f"Database seeding failed: {e}")
print(f"❌ Seeding failed: {e}")
raise
if __name__ == "__main__":
"""Run seeding directly"""
import asyncio
asyncio.run(seed_database())
Step 14: Update Main Application
Update app/main.py:
"""
FastAPI AI Backend - Main Application
Updated with database integration
"""
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.exceptions import RequestValidationError
from pydantic import ValidationError
from contextlib import asynccontextmanager
from app.core.config import settings
from app.api.v1.api import api_router
from app.core.exceptions import AppException
from app.core.error_handlers import (
app_exception_handler,
validation_exception_handler,
generic_exception_handler,
)
from app.middleware.logging_middleware import LoggingMiddleware
from app.middleware.performance_middleware import PerformanceMiddleware
from app.utils.logger import logger
from app.db.session import close_db
from app.db.utils import check_database_connection, get_database_info
@asynccontextmanager
async def lifespan(app: FastAPI):
"""
Application lifespan events
Handles startup and shutdown tasks
"""
# Startup
logger.info(
"Application startup",
extra={
"app_name": settings.APP_NAME,
"version": settings.APP_VERSION,
"environment": settings.ENVIRONMENT,
},
)
print(f"🚀 Starting {settings.APP_NAME} v{settings.APP_VERSION}")
print(f"📝 Environment: {settings.ENVIRONMENT}")
print(f"🔧 Debug mode: {settings.DEBUG}")
# Check database connection
db_connected = await check_database_connection()
if db_connected:
print("✅ Database connection successful")
db_info = await get_database_info()
print(f"📊 PostgreSQL version: {db_info.get('version', 'unknown')}")
else:
print("❌ Database connection failed!")
print(" Make sure PostgreSQL is running")
print(f"📚 API Docs: http://{settings.HOST}:{settings.PORT}/docs")
yield
# Shutdown
logger.info("Application shutdown", extra={"app_name": settings.APP_NAME})
print(f"👋 Shutting down {settings.APP_NAME}")
# Close database connections
await close_db()
print("📊 Database connections closed")
def create_application() -> FastAPI:
"""
Application factory pattern
Creates and configures the FastAPI application
"""
app = FastAPI(
title=settings.APP_NAME,
version=settings.APP_VERSION,
description="""
A production-ready FastAPI backend for AI applications.
## Features
* **Database Integration** with PostgreSQL and SQLAlchemy
* **Async Operations** for high performance
* **Repository Pattern** for clean data access
* **Database Migrations** with Alembic
* **Advanced Validation** with Pydantic
* **Comprehensive Error Handling**
* **Structured Logging** with request tracking
* **Middleware Stack** (logging, performance, CORS)
* **User Management** with password hashing
* **Conversation Management** for AI interactions
## Database
* PostgreSQL 15
* SQLAlchemy 2.0 (async)
* Alembic migrations
* Connection pooling
## Coming Soon
* AI model integration (Ollama)
* JWT Authentication
* WebSocket support
* Redis caching
""",
debug=settings.DEBUG,
docs_url="/docs",
redoc_url="/redoc",
openapi_url="/openapi.json",
lifespan=lifespan,
)
# ============================================
# MIDDLEWARE
# ============================================
app.add_middleware(
PerformanceMiddleware,
slow_request_threshold=1.0,
)
app.add_middleware(LoggingMiddleware)
app.add_middleware(
CORSMiddleware,
allow_origins=settings.allowed_origins_list,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
expose_headers=["X-Request-ID", "X-Process-Time"],
)
# ============================================
# EXCEPTION HANDLERS
# ============================================
app.add_exception_handler(AppException, app_exception_handler)
app.add_exception_handler(RequestValidationError, validation_exception_handler)
app.add_exception_handler(ValidationError, validation_exception_handler)
app.add_exception_handler(Exception, generic_exception_handler)
# ============================================
# ROUTERS
# ============================================
app.include_router(api_router, prefix=settings.API_V1_PREFIX)
return app
# Create the application instance
app = create_application()
Step 15: Create Management Scripts
Create manage.py in the fastapi directory:
"""
Database management script
Provides CLI commands for database operations
"""
import asyncio
import sys
from app.db.utils import (
init_database,
drop_database,
reset_database,
check_database_connection,
get_database_info
)
from app.db.seed import seed_database
async def main():
"""Main CLI handler"""
if len(sys.argv) < 2:
print("""
Usage: python manage.py <command>
Commands:
init - Initialize database (create tables)
drop - Drop all tables (WARNING: deletes all data)
reset - Drop and recreate all tables (WARNING: deletes all data)
seed - Seed database with demo data
check - Check database connection
info - Show database information
""")
return
command = sys.argv[1]
if command == "init":
await init_database()
elif command == "drop":
confirm = input("⚠️ This will delete ALL data. Continue? (yes/no): ")
if confirm.lower() == "yes":
await drop_database()
else:
print("Operation cancelled")
elif command == "reset":
confirm = input("⚠️ This will delete ALL data and recreate tables. Continue? (yes/no): ")
if confirm.lower() == "yes":
await reset_database()
else:
print("Operation cancelled")
elif command == "seed":
await seed_database()
elif command == "check":
connected = await check_database_connection()
if connected:
print("✅ Database connection successful")
else:
print("❌ Database connection failed")
elif command == "info":
info = await get_database_info()
print("\n📊 Database Information:")
for key, value in info.items():
print(f" {key}: {value}")
else:
print(f"Unknown command: {command}")
print("Run 'python manage.py' for help")
if __name__ == "__main__":
asyncio.run(main())
Make it executable:
chmod +x manage.py
Step 16: Testing Database Operations
Create test_database.py:
"""
Test script for database operations
Tests all database functionality
"""
import requests
import json
from typing import Dict, Any
BASE_URL = "http://127.0.0.1:8000/api/v1"
def print_test(title: str, response: requests.Response):
"""Print test results"""
print(f"\n{'='*70}")
print(f"{title}")
print(f"{'='*70}")
print(f"Status: {response.status_code}")
try:
data = response.json()
print(f"Response:\n{json.dumps(data, indent=2, default=str)}")
except:
print(f"Response: {response.text[:500]}")
def test_create_user():
"""Test creating a user"""
user_data = {
"username": "test_user",
"email": "test@example.com",
"password": "SecurePass123",
"full_name": "Test User",
"role": "user"
}
response = requests.post(f"{BASE_URL}/users", json=user_data)
print_test("✅ CREATE USER", response)
if response.status_code == 201:
return response.json()
return None
def test_get_users():
"""Test getting all users"""
response = requests.get(f"{BASE_URL}/users")
print_test("📋 GET ALL USERS", response)
return response.json() if response.status_code == 200 else []
def test_get_user(user_id: int):
"""Test getting specific user"""
response = requests.get(f"{BASE_URL}/users/{user_id}")
print_test(f"👤 GET USER {user_id}", response)
def test_update_user(user_id: int):
"""Test updating user"""
update_data = {
"full_name": "Updated Test User"
}
response = requests.patch(f"{BASE_URL}/users/{user_id}", json=update_data)
print_test(f"✏️ UPDATE USER {user_id}", response)
def test_create_conversation(user_id: int):
"""Test creating conversation"""
conv_data = {
"user_id": user_id,
"title": "Test Conversation",
"model_name": "llama2"
}
response = requests.post(f"{BASE_URL}/conversations", json=conv_data)
print_test("💬 CREATE CONVERSATION", response)
if response.status_code == 201:
return response.json()
return None
def test_add_message(conversation_id: int):
"""Test adding message to conversation"""
message_data = {
"role": "user",
"content": "Hello, this is a test message!",
"total_tokens": 10
}
response = requests.post(
f"{BASE_URL}/conversations/{conversation_id}/messages",
json=message_data
)
print_test("📝 ADD MESSAGE", response)
def test_get_conversation(conversation_id: int):
"""Test getting conversation with messages"""
response = requests.get(f"{BASE_URL}/conversations/{conversation_id}")
print_test(f"💬 GET CONVERSATION {conversation_id}", response)
def test_get_user_conversations(user_id: int):
"""Test getting user's conversations"""
response = requests.get(f"{BASE_URL}/conversations/user/{user_id}")
print_test(f"📋 GET USER {user_id} CONVERSATIONS", response)
def test_delete_conversation(conversation_id: int):
"""Test deleting conversation"""
response = requests.delete(f"{BASE_URL}/conversations/{conversation_id}")
print_test(f"🗑️ DELETE CONVERSATION {conversation_id}", response)
def test_delete_user(user_id: int):
"""Test deleting user"""
response = requests.delete(f"{BASE_URL}/users/{user_id}")
print_test(f"🗑️ DELETE USER {user_id}", response)
def run_all_tests():
"""Run complete test suite"""
print("\n" + "🧪"*35)
print("DATABASE INTEGRATION TEST SUITE")
print("🧪"*35)
# Test user operations
print("\n" + "="*70)
print("USER OPERATIONS")
print("="*70)
created_user = test_create_user()
if not created_user:
print("❌ Failed to create user, stopping tests")
return
user_id = created_user['id']
test_get_users()
test_get_user(user_id)
test_update_user(user_id)
# Test conversation operations
print("\n" + "="*70)
print("CONVERSATION OPERATIONS")
print("="*70)
created_conv = test_create_conversation(user_id)
if not created_conv:
print("❌ Failed to create conversation")
return
conv_id = created_conv['id']
test_add_message(conv_id)
test_add_message(conv_id) # Add second message
test_get_conversation(conv_id)
test_get_user_conversations(user_id)
# Test cascade delete
print("\n" + "="*70)
print("CASCADE DELETE TEST")
print("="*70)
test_delete_user(user_id) # Should also delete conversations
print("\n" + "✅"*35)
print("ALL TESTS COMPLETED!")
print("✅"*35 + "\n")
if __name__ == "__main__":
print("""
╔════════════════════════════════════════════════════════╗
║ Database Integration Test Suite ║
║ ║
║ Prerequisites: ║
║ 1. PostgreSQL running (docker-compose up -d) ║
║ 2. Migrations applied (alembic upgrade head) ║
║ 3. FastAPI server running (python main.py) ║
╚════════════════════════════════════════════════════════╝
""")
try:
response = requests.get(f"{BASE_URL}/health")
if response.status_code == 200:
run_all_tests()
else:
print("❌ Server returned unexpected status")
except requests.exceptions.ConnectionError:
print("❌ ERROR: Cannot connect to server!")
print(" Please start the server with: python main.py")
except Exception as e:
print(f"❌ ERROR: {e}")
Step 17: Complete Workflow & Testing
Complete Setup Workflow
Create a comprehensive setup guide SETUP_DATABASE.md:
# Database Setup Guide
## Prerequisites
- Python 3.10+
- PostgreSQL 15+ (or Docker)
- Virtual environment activated
## Step-by-Step Setup
### 1. Install Dependencies
```bash
cd fastapi
source venv/bin/activate # Windows: venv\Scripts\activate
# Install all packages including database
pip install -r requirements.txt
```
### 2. Start PostgreSQL
**Option A: Docker (Recommended)**
```bash
# From project root
docker-compose up -d
# Verify it's running
docker ps
docker logs aiverse_postgres
```
**Option B: Local Installation**
```bash
# macOS
brew services start postgresql@15
# Linux
sudo systemctl start postgresql
# Windows
# Start from Services or pgAdmin
```
### 3. Configure Environment
```bash
# Copy example environment file
cp .env.example .env
# Update database URL in .env
# DATABASE_URL=postgresql+asyncpg://aiverse_user:aiverse_pass@localhost:5432/aiverse_db
```
### 4. Run Migrations
```bash
# Create initial migration (if not exists)
alembic revision --autogenerate -m "Initial migration"
# Apply migrations
alembic upgrade head
# Verify tables created
docker exec -it aiverse_postgres psql -U aiverse_user -d aiverse_db -c "\dt"
```
### 5. Seed Database (Optional)
```bash
# Seed with demo data
python manage.py seed
# Or run directly
python -m app.db.seed
```
### 6. Start Application
```bash
python main.py
```
### 7. Test Database
```bash
# Check database connection
python manage.py check
# Run database tests
python test_database.py
```
## Common Commands
### Database Management
```bash
# Initialize database
python manage.py init
# Reset database (WARNING: deletes all data)
python manage.py reset
# Seed with demo data
python manage.py seed
# Check connection
python manage.py check
# Show database info
python manage.py info
```
### Alembic Migrations
```bash
# Create new migration
alembic revision --autogenerate -m "Description"
# Apply migrations
alembic upgrade head
# Rollback last migration
alembic downgrade -1
# Show migration history
alembic history
# Show current version
alembic current
```
### Docker Commands
```bash
# Start PostgreSQL
docker-compose up -d
# Stop PostgreSQL
docker-compose down
# View logs
docker logs aiverse_postgres
# Access PostgreSQL shell
docker exec -it aiverse_postgres psql -U aiverse_user -d aiverse_db
# Backup database
docker exec aiverse_postgres pg_dump -U aiverse_user aiverse_db > backup.sql
# Restore database
docker exec -i aiverse_postgres psql -U aiverse_user aiverse_db < backup.sql
```
## Troubleshooting
### Connection Refused
```bash
# Check if PostgreSQL is running
docker ps
# Check logs
docker logs aiverse_postgres
# Restart container
docker-compose restart
```
### Migration Errors
```bash
# Rollback and retry
alembic downgrade -1
alembic upgrade head
# Force revision
alembic stamp head
```
### Password Issues
```bash
# Reset PostgreSQL password
docker exec -it aiverse_postgres psql -U postgres
ALTER USER aiverse_user WITH PASSWORD 'new_password';
```
## Production Deployment
### Environment Variables
```bash
# Use strong passwords
DATABASE_URL=postgresql+asyncpg://user:STRONG_PASSWORD@host:5432/db
# Disable echo in production
DATABASE_ECHO=False
# Adjust pool size based on load
DB_POOL_SIZE=20
DB_MAX_OVERFLOW=40
```
### Security Checklist
- [ ] Use strong database passwords
- [ ] Enable SSL/TLS for database connections
- [ ] Restrict database access to application servers only
- [ ] Regular backups scheduled
- [ ] Monitor connection pool usage
- [ ] Set up database monitoring
- [ ] Configure proper firewall rules
### Performance Optimization
```bash
# Increase pool size for high traffic
DB_POOL_SIZE=50
DB_MAX_OVERFLOW=100
# Enable connection recycling
DB_POOL_RECYCLE=3600
# Use read replicas for scaling
# Add read replica URL to settings
```
Step 18: Update Requirements.txt
Update requirements.txt with all database dependencies:
# FastAPI and core dependencies
fastapi==0.115.0
uvicorn[standard]==0.32.1
pydantic==2.10.3
pydantic-settings==2.7.0
pydantic[email]==2.10.3
# Email validation
email-validator==2.3.0
dnspython==2.8.0
# Phone number validation
phonenumbers==9.0.28
# Environment configuration
python-dotenv==1.2.2
# Structured logging
python-json-logger==4.1.0
# HTTP client
requests==2.33.1
# Database - SQLAlchemy
sqlalchemy[asyncio]==2.0.23
asyncpg==0.29.0
psycopg2-binary==2.9.9
# Database migrations
alembic==1.13.1
# Password hashing
passlib[bcrypt]==1.7.4
# ASGI server
starlette==0.45.0
Step 19: Create Comprehensive Testing Script
Create test_complete_database.py:
"""
Comprehensive database integration test
Tests all database functionality end-to-end
"""
import asyncio
from sqlalchemy.ext.asyncio import AsyncSession
from app.db.session import async_session_maker
from app.db.repositories.user_repository import UserRepository
from app.db.repositories.conversation_repository import ConversationRepository
from app.db.repositories.message_repository import MessageRepository
from app.db.models.user import UserRole
from app.db.models.message import MessageRole
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
async def test_user_operations():
"""Test user CRUD operations"""
print("\n" + "="*70)
print("TESTING USER OPERATIONS")
print("="*70)
async with async_session_maker() as session:
repo = UserRepository(session)
# Create user
print("\n1. Creating user...")
user = await repo.create(
username="test_db_user",
email="testdb@example.com",
hashed_password=pwd_context.hash("password123"),
full_name="Test DB User",
role=UserRole.USER,
is_active=True
)
print(f" ✅ Created: {user}")
# Get user by ID
print("\n2. Getting user by ID...")
found_user = await repo.get_by_id(user.id)
print(f" ✅ Found: {found_user}")
# Get user by username
print("\n3. Getting user by username...")
username_user = await repo.get_by_username("test_db_user")
print(f" ✅ Found: {username_user}")
# Update user
print("\n4. Updating user...")
updated = await repo.update(user.id, full_name="Updated Name")
print(f" ✅ Updated: {updated}")
# Count users
print("\n5. Counting users...")
count = await repo.count()
print(f" ✅ Total users: {count}")
await session.commit()
return user
async def test_conversation_operations(user_id: int):
"""Test conversation CRUD operations"""
print("\n" + "="*70)
print("TESTING CONVERSATION OPERATIONS")
print("="*70)
async with async_session_maker() as session:
repo = ConversationRepository(session)
# Create conversation
print("\n1. Creating conversation...")
conv = await repo.create(
user_id=user_id,
title="Test Conversation",
model_name="llama2"
)
print(f" ✅ Created: {conv}")
# Get user conversations
print("\n2. Getting user conversations...")
user_convs = await repo.get_by_user(user_id)
print(f" ✅ Found {len(user_convs)} conversations")
# Count conversations
print("\n3. Counting user conversations...")
count = await repo.get_user_conversation_count(user_id)
print(f" ✅ User has {count} conversations")
await session.commit()
return conv
async def test_message_operations(conversation_id: int):
"""Test message CRUD operations"""
print("\n" + "="*70)
print("TESTING MESSAGE OPERATIONS")
print("="*70)
async with async_session_maker() as session:
repo = MessageRepository(session)
# Create messages
print("\n1. Creating messages...")
msg1 = await repo.create(
conversation_id=conversation_id,
role=MessageRole.USER,
content="Hello, how are you?",
total_tokens=5
)
print(f" ✅ Created message 1: {msg1}")
msg2 = await repo.create(
conversation_id=conversation_id,
role=MessageRole.ASSISTANT,
content="I'm doing great, thank you!",
total_tokens=7
)
print(f" ✅ Created message 2: {msg2}")
# Get conversation messages
print("\n2. Getting conversation messages...")
messages = await repo.get_by_conversation(conversation_id)
print(f" ✅ Found {len(messages)} messages")
for msg in messages:
print(f" - [{msg.role}]: {msg.content[:50]}...")
# Get last message
print("\n3. Getting last message...")
last = await repo.get_last_message(conversation_id)
print(f" ✅ Last message: {last.content}")
# Count by role
print("\n4. Counting messages by role...")
user_count = await repo.count_by_role(conversation_id, MessageRole.USER)
ai_count = await repo.count_by_role(conversation_id, MessageRole.ASSISTANT)
print(f" ✅ User messages: {user_count}")
print(f" ✅ Assistant messages: {ai_count}")
await session.commit()
async def test_cascade_delete(user_id: int):
"""Test cascade delete (deleting user deletes conversations and messages)"""
print("\n" + "="*70)
print("TESTING CASCADE DELETE")
print("="*70)
async with async_session_maker() as session:
user_repo = UserRepository(session)
conv_repo = ConversationRepository(session)
msg_repo = MessageRepository(session)
# Count before delete
print("\n1. Counting before delete...")
user_convs_before = await conv_repo.get_user_conversation_count(user_id)
print(f" Conversations before: {user_convs_before}")
# Delete user
print("\n2. Deleting user...")
deleted = await user_repo.delete(user_id)
print(f" ✅ User deleted: {deleted}")
await session.commit()
# Verify conversations deleted
print("\n3. Verifying cascade delete...")
user_convs_after = await conv_repo.get_user_conversation_count(user_id)
print(f" Conversations after: {user_convs_after}")
print(f" ✅ Cascade delete successful!")
async def test_relationships():
"""Test SQLAlchemy relationships"""
print("\n" + "="*70)
print("TESTING RELATIONSHIPS")
print("="*70)
async with async_session_maker() as session:
conv_repo = ConversationRepository(session)
user_repo = UserRepository(session)
# Create user
print("\n1. Creating user with conversations...")
user = await user_repo.create(
username="relationship_test",
email="relationship@example.com",
hashed_password=pwd_context.hash("password123"),
role=UserRole.USER,
is_active=True
)
# Create conversations
conv1 = await conv_repo.create(
user_id=user.id,
title="First Conversation",
model_name="llama2"
)
conv2 = await conv_repo.create(
user_id=user.id,
title="Second Conversation",
model_name="mistral"
)
await session.commit()
# Test eager loading
print("\n2. Testing eager loading...")
loaded_user = await user_repo.get_by_id(user.id)
await session.refresh(loaded_user, ["conversations"])
print(f" ✅ User has {len(loaded_user.conversations)} conversations")
# Test conversation with messages
print("\n3. Testing conversation with messages...")
conv_with_msgs = await conv_repo.get_with_messages(conv1.id)
print(f" ✅ Conversation loaded with {len(conv_with_msgs.messages)} messages")
# Cleanup
await user_repo.delete(user.id)
await session.commit()
async def run_all_tests():
"""Run all database tests"""
print("\n" + "🧪"*35)
print("COMPREHENSIVE DATABASE TEST SUITE")
print("🧪"*35)
try:
# Test user operations
user = await test_user_operations()
# Test conversation operations
conv = await test_conversation_operations(user.id)
# Test message operations
await test_message_operations(conv.id)
# Test relationships
await test_relationships()
# Test cascade delete (cleanup)
await test_cascade_delete(user.id)
print("\n" + "✅"*35)
print("ALL DATABASE TESTS PASSED!")
print("✅"*35 + "\n")
except Exception as e:
print(f"\n❌ TEST FAILED: {e}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
print("""
╔════════════════════════════════════════════════════════╗
║ Comprehensive Database Test Suite ║
║ ║
║ This tests: ║
║ - User CRUD operations ║
║ - Conversation CRUD operations ║
║ - Message CRUD operations ║
║ - SQLAlchemy relationships ║
║ - Cascade deletes ║
║ ║
║ Prerequisites: ║
║ 1. PostgreSQL running ║
║ 2. Migrations applied ║
╚════════════════════════════════════════════════════════╝
""")
asyncio.run(run_all_tests())
Step 20: Create Production Deployment Guide
Create DEPLOYMENT.md:
# Database Deployment Guide
## Production PostgreSQL Setup
### 1. Managed Database Services (Recommended)
**AWS RDS:**
```bash
# Create RDS PostgreSQL instance
aws rds create-db-instance \
--db-instance-identifier aiverse-prod \
--db-instance-class db.t3.medium \
--engine postgres \
--engine-version 15.4 \
--master-username admin \
--master-user-password \
--allocated-storage 100 \
--storage-encrypted \
--backup-retention-period 7
# Update .env with RDS endpoint
DATABASE_URL=postgresql+asyncpg://admin:password@aiverse-prod.xxx.rds.amazonaws.com:5432/aiverse
```
**Google Cloud SQL:**
```bash
# Create Cloud SQL instance
gcloud sql instances create aiverse-prod \
--database-version=POSTGRES_15 \
--tier=db-n1-standard-2 \
--region=us-central1
# Update connection string
DATABASE_URL=postgresql+asyncpg://user:pass@:5432/aiverse
```
**DigitalOcean Managed Database:**
```bash
# Create via web interface or API
# Connection string format:
DATABASE_URL=postgresql+asyncpg://user:pass@db-postgresql-xxx.ondigitalocean.com:25060/aiverse?sslmode=require
```
### 2. SSL/TLS Configuration
**Enable SSL in production:**
```python
# app/core/config.py
DATABASE_URL: str = "postgresql+asyncpg://user:pass@host:5432/db?ssl=require"
# For custom SSL certificates
from sqlalchemy.engine.url import make_url
from ssl import create_default_context
url = make_url(settings.DATABASE_URL)
ssl_context = create_default_context()
ssl_context.load_cert_chain('/path/to/cert.pem', '/path/to/key.pem')
engine = create_async_engine(
url,
connect_args={"ssl": ssl_context}
)
```
### 3. Connection Pool Optimization
**High Traffic Settings:**
```bash
# .env.production
DB_POOL_SIZE=50
DB_MAX_OVERFLOW=100
DB_POOL_TIMEOUT=30
DB_POOL_RECYCLE=3600
```
**Connection Pool Monitoring:**
```python
# Add to app startup
from app.db.session import engine
@app.on_event("startup")
async def monitor_pool():
pool = engine.pool
logger.info(f"Pool size: {pool.size()}")
logger.info(f"Checked out: {pool.checkedout()}")
```
### 4. Database Migrations in Production
**Automated Migration Workflow:**
```bash
# 1. Backup database before migration
pg_dump -h host -U user dbname > backup_$(date +%Y%m%d).sql
# 2. Run migrations
alembic upgrade head
# 3. Verify migration
alembic current
# 4. If issues, rollback
alembic downgrade -1
```
**CI/CD Integration:**
```yaml
# .github/workflows/deploy.yml
- name: Run Database Migrations
run: |
alembic upgrade head
env:
DATABASE_URL: ${{ secrets.DATABASE_URL }}
```
### 5. Backup Strategy
**Automated Backups:**
```bash
# Daily backup script
#!/bin/bash
BACKUP_DIR="/backups"
DATE=$(date +%Y%m%d_%H%M%S)
DB_NAME="aiverse_db"
pg_dump -h localhost -U aiverse_user $DB_NAME | \
gzip > $BACKUP_DIR/backup_$DATE.sql.gz
# Keep last 7 days
find $BACKUP_DIR -name "backup_*.sql.gz" -mtime +7 -delete
```
**Cron Schedule:**
```bash
# Add to crontab
0 2 * * * /path/to/backup.sh
```
### 6. Monitoring & Alerts
**Query Performance Monitoring:**
```python
# Enable query logging
import logging
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)
```
**Database Health Checks:**
```python
# app/api/v1/endpoints/health.py
@router.get("/health/database")
async def database_health():
from app.db.utils import check_database_connection, get_database_info
is_connected = await check_database_connection()
info = await get_database_info()
return {
"status": "healthy" if is_connected else "unhealthy",
"database": info
}
```
### 7. Security Best Practices
**Environment Variables:**
```bash
# Never commit these to git
DATABASE_URL=postgresql+asyncpg://user:STRONG_RANDOM_PASSWORD@host:5432/db
SECRET_KEY=
```
**Network Security:**
```bash
# PostgreSQL pg_hba.conf
# Only allow app servers
host aiverse_db aiverse_user 10.0.1.0/24 scram-sha-256
# Firewall rules
# Only allow port 5432 from application servers
```
**Role-Based Access:**
```sql
-- Create read-only user for analytics
CREATE ROLE analytics_user WITH LOGIN PASSWORD 'password';
GRANT CONNECT ON DATABASE aiverse_db TO analytics_user;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO analytics_user;
```
### 8. Performance Tuning
**PostgreSQL Configuration:**
```conf
# postgresql.conf
shared_buffers = 256MB
effective_cache_size = 1GB
maintenance_work_mem = 64MB
checkpoint_completion_target = 0.9
wal_buffers = 16MB
default_statistics_target = 100
random_page_cost = 1.1
effective_io_concurrency = 200
work_mem = 4MB
min_wal_size = 1GB
max_wal_size = 4GB
```
**Index Optimization:**
```sql
-- Create indexes for common queries
CREATE INDEX CONCURRENTLY idx_users_email ON users(email);
CREATE INDEX CONCURRENTLY idx_conversations_user_updated
ON conversations(user_id, updated_at DESC);
CREATE INDEX CONCURRENTLY idx_messages_conversation_created
ON messages(conversation_id, created_at);
-- Analyze tables
ANALYZE users;
ANALYZE conversations;
ANALYZE messages;
```
### 9. Disaster Recovery
**Point-in-Time Recovery:**
```bash
# Enable WAL archiving
# postgresql.conf
archive_mode = on
archive_command = 'cp %p /archive/%f'
# Restore to specific point
pg_restore --dbname=aiverse_db --clean backup.sql
```
**High Availability Setup:**
```bash
# Primary-Replica configuration
# Primary: postgresql.conf
wal_level = replica
max_wal_senders = 3
wal_keep_size = 1GB
# Replica: recovery.conf
primary_conninfo = 'host=primary port=5432 user=replicator'
```
### 10. Migration Checklist
Production deployment checklist:
- [ ] Database backups scheduled
- [ ] SSL/TLS enabled
- [ ] Connection pool configured
- [ ] Monitoring set up
- [ ] Health checks implemented
- [ ] Migration scripts tested
- [ ] Rollback plan documented
- [ ] Access controls configured
- [ ] Performance indexes created
- [ ] Disaster recovery plan in place
📚 FAQ Section
Why use PostgreSQL instead of MySQL or SQLite?
What is the Repository Pattern and why should I use it?
How do I handle database migrations in production without downtime?
What’s the difference between async and sync SQLAlchemy, and when should I use async?
How do I optimize database performance for production?
🎯 Summary
Congratulations! You’ve successfully integrated PostgreSQL with your FastAPI application. Here’s what you’ve accomplished:
✅ What You Built:
- Complete database integration with PostgreSQL
- SQLAlchemy ORM models with relationships
- Repository pattern for clean data access
- Alembic migrations for schema management
- Async database operations for performance
- Connection pooling for scalability
- User and conversation management
- Password hashing for security
✅ Production-Ready Features:
- Proper error handling
- Database health checks
- Migration management
- Seed data scripts
- Comprehensive testing
- Security best practices
📊 Architecture Achievement:
FastAPI Endpoints
↓
Services (Business Logic)
↓
Repositories (Data Access)
↓
SQLAlchemy Models
↓
PostgreSQL Database
Ready for Blog Post 9: JWT Authentication & Authorization! 🚀
Leave a Reply