Views: 1
To integrate PostgreSQL with FastAPI using SQLAlchemy, install SQLAlchemy and asyncpg for async support, create database models inheriting from SQLAlchemy’s declarative base, set up an async engine and session factory, use Alembic for database migrations, and implement dependency injection to provide database sessions to your endpoints. Replace in-memory storage in your service layer with SQLAlchemy queries, use async/await patterns for all database operations, and implement proper connection pooling. This architecture ensures scalable, production-ready database integration with full ORM capabilities.
๐ What You’ll Learn
By the end of this tutorial, you’ll be able to:
- Set up PostgreSQL database for FastAPI
- Create SQLAlchemy ORM models with relationships
- Implement async database operations
- Use Alembic for database migrations
- Handle database sessions with dependency injection
- Implement proper connection pooling
- Create database indexes and constraints
- Handle transactions and rollbacks
- Implement database seeding and fixtures
- Deploy database in production
๐ Understanding Database Integration
Why PostgreSQL + SQLAlchemy?
| Technology | Why We Use It |
|---|---|
| PostgreSQL | Production-grade, ACID compliant, supports JSON, full-text search |
| SQLAlchemy | Mature ORM, async support, migrations, relationship handling |
| Alembic | Database migrations, version control for schema changes |
| asyncpg | Fastest PostgreSQL driver for Python async |
Architecture Overview
FastAPI Endpoint
โ
Dependency Injection (get_db)
โ
SQLAlchemy Session
โ
ORM Models (User, Conversation, Message)
โ
PostgreSQL Database
๐ ๏ธ Step-by-Step Implementation
Step 1: Install Dependencies
# Activate your virtual environment
cd fastapi
source venv/bin/activate # or venv\Scripts\activate on Windows
# Install database packages
pip install sqlalchemy[asyncio]==2.0.23
pip install asyncpg==0.29.0
pip install alembic==1.13.1
pip install psycopg2-binary==2.9.9 # For sync operations if needed
# Update requirements
pip freeze > requirements.txt
What we installed:
sqlalchemy[asyncio]: ORM with async supportasyncpg: Async PostgreSQL driveralembic: Database migration toolpsycopg2-binary: PostgreSQL adapter (backup/utilities)
Step 2: Install and Setup PostgreSQL
Option A: Local Installation
Windows:
# Download from https://www.postgresql.org/download/windows/
# Or use chocolatey:
choco install postgresql
macOS:
brew install postgresql@15
brew services start postgresql@15
Linux (Ubuntu/Debian):
sudo apt update
sudo apt install postgresql postgresql-contrib
sudo systemctl start postgresql
Option B: Docker (Recommended for Development)
Create docker-compose.yml in project root:
version: '3.8'
services:
postgres:
image: postgres:15-alpine
container_name: aiverse_postgres
environment:
POSTGRES_USER: aiverse_user
POSTGRES_PASSWORD: aiverse_pass
POSTGRES_DB: aiverse_db
ports:
- "5432:5432"
volumes:
- postgres_data:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U aiverse_user -d aiverse_db"]
interval: 10s
timeout: 5s
retries: 5
volumes:
postgres_data:
Start PostgreSQL:
docker-compose up -d
Verify it’s running:
docker ps
# You should see aiverse_postgres running
Step 3: Update Configuration
Update app/core/config.py:
"""
Core configuration module
Manages all application settings using Pydantic Settings
"""
from pydantic_settings import BaseSettings, SettingsConfigDict
from typing import List, Optional
from functools import lru_cache
class Settings(BaseSettings):
"""
Application settings
These values are loaded from environment variables or .env file
Pydantic validates the types automatically
"""
# Application
APP_NAME: str = "AIVerse Backend"
APP_VERSION: str = "0.4.0"
DEBUG: bool = False
ENVIRONMENT: str = "production"
# API
API_V1_PREFIX: str = "/api/v1"
HOST: str = "0.0.0.0"
PORT: int = 8000
# CORS
ALLOWED_ORIGINS: str = "http://localhost:3000"
@property
def allowed_origins_list(self) -> List[str]:
"""Convert comma-separated string to list"""
return [origin.strip() for origin in self.ALLOWED_ORIGINS.split(",")]
# Database Configuration
DATABASE_URL: str = "postgresql+asyncpg://aiverse_user:aiverse_pass@localhost:5432/aiverse_db"
DATABASE_ECHO: bool = False # Set to True to see SQL queries in logs
# Database Connection Pool Settings
DB_POOL_SIZE: int = 5
DB_MAX_OVERFLOW: int = 10
DB_POOL_TIMEOUT: int = 30
DB_POOL_RECYCLE: int = 3600 # Recycle connections after 1 hour
@property
def async_database_url(self) -> str:
"""Get async database URL"""
return self.DATABASE_URL
@property
def sync_database_url(self) -> str:
"""Get sync database URL (for Alembic migrations)"""
return self.DATABASE_URL.replace("+asyncpg", "").replace("postgresql+asyncpg", "postgresql")
# AI Settings
OLLAMA_BASE_URL: str = "http://localhost:11434"
DEFAULT_AI_MODEL: str = "llama2"
# Security
SECRET_KEY: str = "change-this-in-production"
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
# Pydantic Settings Configuration
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",
case_sensitive=True,
extra="ignore"
)
@lru_cache()
def get_settings() -> Settings:
"""
Create settings instance (cached)
@lru_cache ensures we only create one Settings instance
and reuse it throughout the application
Returns:
Settings: Application settings
"""
return Settings()
# Convenience: Get settings instance
settings = get_settings()
Update .env:
# Application Configuration
APP_NAME=AIVerse Backend
APP_VERSION=0.4.0
DEBUG=True
ENVIRONMENT=development
# Server Configuration
API_V1_PREFIX=/api/v1
HOST=0.0.0.0
PORT=8000
# CORS Configuration
ALLOWED_ORIGINS=http://localhost:3000,http://localhost:8000
# Database Configuration
DATABASE_URL=postgresql+asyncpg://aiverse_user:aiverse_pass@localhost:5432/aiverse_db
DATABASE_ECHO=True
DB_POOL_SIZE=5
DB_MAX_OVERFLOW=10
DB_POOL_TIMEOUT=30
DB_POOL_RECYCLE=3600
# AI Configuration
OLLAMA_BASE_URL=http://localhost:11434
DEFAULT_AI_MODEL=llama2
# Security Configuration
SECRET_KEY=your-secret-key-change-in-production
ALGORITHM=HS256
ACCESS_TOKEN_EXPIRE_MINUTES=30
Step 4: Create Database Infrastructure
Create app/db/__init__.py:
"""
Database package
Contains database configuration, models, and utilities
"""
from app.db.base import Base
from app.db.session import engine, async_session_maker, get_db
__all__ = [
"Base",
"engine",
"async_session_maker",
"get_db",
]
Create app/db/base.py:
"""
Base class for all database models
All SQLAlchemy models should inherit from Base
"""
from sqlalchemy.orm import DeclarativeBase
from sqlalchemy import MetaData
# Naming convention for constraints
# This ensures consistent naming across the database
NAMING_CONVENTION = {
"ix": "ix_%(column_0_label)s",
"uq": "uq_%(table_name)s_%(column_0_name)s",
"ck": "ck_%(table_name)s_%(constraint_name)s",
"fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s",
"pk": "pk_%(table_name)s"
}
metadata = MetaData(naming_convention=NAMING_CONVENTION)
class Base(DeclarativeBase):
"""
Base class for all database models
All models should inherit from this class
"""
metadata = metadata
Create app/db/session.py:
"""
Database session management
Handles database connection, session creation, and dependency injection
"""
from sqlalchemy.ext.asyncio import (
create_async_engine,
async_sessionmaker,
AsyncSession,
AsyncEngine
)
from typing import AsyncGenerator
from app.core.config import settings
# Create async engine
engine: AsyncEngine = create_async_engine(
settings.async_database_url,
echo=settings.DATABASE_ECHO,
pool_size=settings.DB_POOL_SIZE,
max_overflow=settings.DB_MAX_OVERFLOW,
pool_timeout=settings.DB_POOL_TIMEOUT,
pool_recycle=settings.DB_POOL_RECYCLE,
pool_pre_ping=True, # Verify connections before using
)
# Create async session factory
async_session_maker = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
autocommit=False,
autoflush=False,
)
async def get_db() -> AsyncGenerator[AsyncSession, None]:
"""
Dependency for getting database session
Yields:
AsyncSession: Database session
Usage:
@app.get("/users")
async def get_users(db: AsyncSession = Depends(get_db)):
result = await db.execute(select(User))
return result.scalars().all()
"""
async with async_session_maker() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()
async def init_db() -> None:
"""
Initialize database
Creates all tables if they don't exist
Note: In production, use Alembic migrations instead
"""
from app.db.base import Base
async with engine.begin() as conn:
# Drop all tables (only for development!)
# await conn.run_sync(Base.metadata.drop_all)
# Create all tables
await conn.run_sync(Base.metadata.create_all)
async def close_db() -> None:
"""
Close database connections
Call this on application shutdown
"""
await engine.dispose()
Step 5: Create Database Models
Create app/db/models/__init__.py:
"""
Database models package
Contains all SQLAlchemy ORM models
"""
from app.db.models.user import User
from app.db.models.conversation import Conversation
from app.db.models.message import Message
__all__ = [
"User",
"Conversation",
"Message",
]
Create app/db/models/user.py:
"""
User database model
SQLAlchemy ORM model for users
"""
from sqlalchemy import String, Boolean, DateTime, Enum as SQLEnum
from sqlalchemy.orm import Mapped, mapped_column, relationship
from datetime import datetime
from typing import List
import enum
from app.db.base import Base
class UserRole(str, enum.Enum):
"""User role enumeration"""
ADMIN = "admin"
USER = "user"
GUEST = "guest"
class User(Base):
"""
User model
Represents a user in the system
"""
__tablename__ = "users"
# Primary Key
id: Mapped[int] = mapped_column(primary_key=True, index=True)
# User Information
username: Mapped[str] = mapped_column(
String(50),
unique=True,
index=True,
nullable=False
)
email: Mapped[str] = mapped_column(
String(255),
unique=True,
index=True,
nullable=False
)
hashed_password: Mapped[str] = mapped_column(
String(255),
nullable=False
)
full_name: Mapped[str | None] = mapped_column(
String(100),
nullable=True
)
# Role and Status
role: Mapped[UserRole] = mapped_column(
SQLEnum(UserRole),
default=UserRole.USER,
nullable=False
)
is_active: Mapped[bool] = mapped_column(
Boolean,
default=True,
nullable=False
)
is_verified: Mapped[bool] = mapped_column(
Boolean,
default=False,
nullable=False
)
# Timestamps
created_at: Mapped[datetime] = mapped_column(
DateTime,
default=datetime.utcnow,
nullable=False
)
updated_at: Mapped[datetime] = mapped_column(
DateTime,
default=datetime.utcnow,
onupdate=datetime.utcnow,
nullable=False
)
last_login: Mapped[datetime | None] = mapped_column(
DateTime,
nullable=True
)
# Relationships
conversations: Mapped[List["Conversation"]] = relationship(
"Conversation",
back_populates="user",
cascade="all, delete-orphan"
)
def __repr__(self) -> str:
return f"<User(id={self.id}, username='{self.username}', email='{self.email}')>"
Create app/db/models/conversation.py:
"""
Conversation database model
SQLAlchemy ORM model for AI conversations
"""
from sqlalchemy import String, DateTime, ForeignKey, Text
from sqlalchemy.orm import Mapped, mapped_column, relationship
from datetime import datetime
from typing import List
from app.db.base import Base
class Conversation(Base):
"""
Conversation model
Represents an AI conversation thread
"""
__tablename__ = "conversations"
# Primary Key
id: Mapped[int] = mapped_column(primary_key=True, index=True)
# Foreign Keys
user_id: Mapped[int] = mapped_column(
ForeignKey("users.id", ondelete="CASCADE"),
nullable=False,
index=True
)
# Conversation Information
title: Mapped[str] = mapped_column(
String(255),
nullable=False,
default="New Conversation"
)
model_name: Mapped[str] = mapped_column(
String(50),
nullable=False
)
# Metadata
metadata_json: Mapped[str | None] = mapped_column(
Text,
nullable=True
)
# Timestamps
created_at: Mapped[datetime] = mapped_column(
DateTime,
default=datetime.utcnow,
nullable=False
)
updated_at: Mapped[datetime] = mapped_column(
DateTime,
default=datetime.utcnow,
onupdate=datetime.utcnow,
nullable=False
)
# Relationships
user: Mapped["User"] = relationship(
"User",
back_populates="conversations"
)
messages: Mapped[List["Message"]] = relationship(
"Message",
back_populates="conversation",
cascade="all, delete-orphan",
order_by="Message.created_at"
)
def __repr__(self) -> str:
return f"<Conversation(id={self.id}, title='{self.title}', user_id={self.user_id})>"
Create app/db/models/message.py:
"""
Message database model
SQLAlchemy ORM model for conversation messages
"""
from sqlalchemy import String, DateTime, ForeignKey, Text, Enum as SQLEnum
from sqlalchemy.orm import Mapped, mapped_column, relationship
from datetime import datetime
import enum
from app.db.base import Base
class MessageRole(str, enum.Enum):
"""Message role enumeration"""
USER = "user"
ASSISTANT = "assistant"
SYSTEM = "system"
class Message(Base):
"""
Message model
Represents a single message in a conversation
"""
__tablename__ = "messages"
# Primary Key
id: Mapped[int] = mapped_column(primary_key=True, index=True)
# Foreign Keys
conversation_id: Mapped[int] = mapped_column(
ForeignKey("conversations.id", ondelete="CASCADE"),
nullable=False,
index=True
)
# Message Information
role: Mapped[MessageRole] = mapped_column(
SQLEnum(MessageRole),
nullable=False
)
content: Mapped[str] = mapped_column(
Text,
nullable=False
)
# Token Information (optional)
prompt_tokens: Mapped[int | None] = mapped_column(nullable=True)
completion_tokens: Mapped[int | None] = mapped_column(nullable=True)
total_tokens: Mapped[int | None] = mapped_column(nullable=True)
# Timestamps
created_at: Mapped[datetime] = mapped_column(
DateTime,
default=datetime.utcnow,
nullable=False,
index=True
)
# Relationships
conversation: Mapped["Conversation"] = relationship(
"Conversation",
back_populates="messages"
)
def __repr__(self) -> str:
content_preview = self.content[:50] + "..." if len(self.content) > 50 else self.content
return f"<Message(id={self.id}, role='{self.role}', content='{content_preview}')>"
Step 6: Setup Alembic for Migrations
Initialize Alembic:
# From fastapi/ directory
alembic init alembic
This creates:
fastapi/
โโโ alembic/
โ โโโ versions/ # Migration files
โ โโโ env.py # Alembic environment
โ โโโ script.py.mako
โ โโโ README
โโโ alembic.ini # Alembic configuration
Update alembic.ini:
# alembic.ini
[alembic]
script_location = alembic
prepend_sys_path = .
version_path_separator = os
# Disable this - we'll set it programmatically
# sqlalchemy.url = postgresql://user:pass@localhost/dbname
[loggers]
keys = root,sqlalchemy,alembic
[handlers]
keys = console
[formatters]
keys = generic
[logger_root]
level = WARN
handlers = console
qualname =
[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine
[logger_alembic]
level = INFO
handlers =
qualname = alembic
[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic
[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S
Update alembic/env.py:
"""
Alembic environment configuration
Handles database migrations
"""
from logging.config import fileConfig
from sqlalchemy import pool
from sqlalchemy.engine import Connection
from sqlalchemy.ext.asyncio import async_engine_from_config
from alembic import context
import asyncio
# Import your models and config
from app.core.config import settings
from app.db.base import Base
# Import all models so Alembic can detect them
from app.db.models import User, Conversation, Message
# this is the Alembic Config object
config = context.config
# Set database URL from settings
config.set_main_option("sqlalchemy.url", settings.sync_database_url)
# Interpret the config file for Python logging
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# Set target metadata for autogenerate
target_metadata = Base.metadata
def run_migrations_offline() -> None:
"""
Run migrations in 'offline' mode
This configures the context with just a URL
and not an Engine, though an Engine is acceptable
here as well. By skipping the Engine creation
we don't even need a DBAPI to be available.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
compare_type=True,
)
with context.begin_transaction():
context.run_migrations()
def do_run_migrations(connection: Connection) -> None:
"""Run migrations with provided connection"""
context.configure(
connection=connection,
target_metadata=target_metadata,
compare_type=True,
)
with context.begin_transaction():
context.run_migrations()
async def run_async_migrations() -> None:
"""Run migrations in 'online' mode with async engine"""
connectable = async_engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
async with connectable.connect() as connection:
await connection.run_sync(do_run_migrations)
await connectable.dispose()
def run_migrations_online() -> None:
"""
Run migrations in 'online' mode
In this scenario we need to create an Engine
and associate a connection with the context.
"""
asyncio.run(run_async_migrations())
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
Step 7: Create Initial Migration
# Create initial migration
alembic revision --autogenerate -m "Initial migration: users, conversations, messages"
# Apply migration
alembic upgrade head
You should see:
INFO [alembic.runtime.migration] Running upgrade -> abc123, Initial migration: users, conversations, messages
Verify tables were created:
# Using Docker
docker exec -it aiverse_postgres psql -U aiverse_user -d aiverse_db -c "\dt"
# Should show:
# public | alembic_version | table | aiverse_user
# public | conversations | table | aiverse_user
# public | messages | table | aiverse_user
# public | users | table | aiverse_user
Leave a Reply