Ep.08 Database Integration with PostgreSQL & SQLAlchemy in FastAPI

Views: 1

To integrate PostgreSQL with FastAPI using SQLAlchemy, install SQLAlchemy and asyncpg for async support, create database models inheriting from SQLAlchemy’s declarative base, set up an async engine and session factory, use Alembic for database migrations, and implement dependency injection to provide database sessions to your endpoints. Replace in-memory storage in your service layer with SQLAlchemy queries, use async/await patterns for all database operations, and implement proper connection pooling. This architecture ensures scalable, production-ready database integration with full ORM capabilities.

๐ŸŽ“ What You’ll Learn

By the end of this tutorial, you’ll be able to:

  • Set up PostgreSQL database for FastAPI
  • Create SQLAlchemy ORM models with relationships
  • Implement async database operations
  • Use Alembic for database migrations
  • Handle database sessions with dependency injection
  • Implement proper connection pooling
  • Create database indexes and constraints
  • Handle transactions and rollbacks
  • Implement database seeding and fixtures
  • Deploy database in production

๐Ÿ“– Understanding Database Integration

Why PostgreSQL + SQLAlchemy?

TechnologyWhy We Use It
PostgreSQLProduction-grade, ACID compliant, supports JSON, full-text search
SQLAlchemyMature ORM, async support, migrations, relationship handling
AlembicDatabase migrations, version control for schema changes
asyncpgFastest PostgreSQL driver for Python async

Architecture Overview

FastAPI Endpoint
    โ†“
Dependency Injection (get_db)
    โ†“
SQLAlchemy Session
    โ†“
ORM Models (User, Conversation, Message)
    โ†“
PostgreSQL Database

๐Ÿ› ๏ธ Step-by-Step Implementation

Step 1: Install Dependencies

# Activate your virtual environment
cd fastapi
source venv/bin/activate  # or venv\Scripts\activate on Windows

# Install database packages
pip install sqlalchemy[asyncio]==2.0.23
pip install asyncpg==0.29.0
pip install alembic==1.13.1
pip install psycopg2-binary==2.9.9  # For sync operations if needed

# Update requirements
pip freeze > requirements.txt

What we installed:

  • sqlalchemy[asyncio]: ORM with async support
  • asyncpg: Async PostgreSQL driver
  • alembic: Database migration tool
  • psycopg2-binary: PostgreSQL adapter (backup/utilities)

Step 2: Install and Setup PostgreSQL

Option A: Local Installation

Windows:

# Download from https://www.postgresql.org/download/windows/
# Or use chocolatey:
choco install postgresql

macOS:

brew install postgresql@15
brew services start postgresql@15

Linux (Ubuntu/Debian):

sudo apt update
sudo apt install postgresql postgresql-contrib
sudo systemctl start postgresql

Option B: Docker (Recommended for Development)

Create docker-compose.yml in project root:

version: '3.8'

services:
  postgres:
    image: postgres:15-alpine
    container_name: aiverse_postgres
    environment:
      POSTGRES_USER: aiverse_user
      POSTGRES_PASSWORD: aiverse_pass
      POSTGRES_DB: aiverse_db
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U aiverse_user -d aiverse_db"]
      interval: 10s
      timeout: 5s
      retries: 5

volumes:
  postgres_data:

Start PostgreSQL:

docker-compose up -d

Verify it’s running:

docker ps
# You should see aiverse_postgres running

Step 3: Update Configuration

Update app/core/config.py:

"""
Core configuration module
Manages all application settings using Pydantic Settings
"""

from pydantic_settings import BaseSettings, SettingsConfigDict
from typing import List, Optional
from functools import lru_cache


class Settings(BaseSettings):
    """
    Application settings
    
    These values are loaded from environment variables or .env file
    Pydantic validates the types automatically
    """
    
    # Application
    APP_NAME: str = "AIVerse Backend"
    APP_VERSION: str = "0.4.0"
    DEBUG: bool = False
    ENVIRONMENT: str = "production"
    
    # API
    API_V1_PREFIX: str = "/api/v1"
    HOST: str = "0.0.0.0"
    PORT: int = 8000
    
    # CORS
    ALLOWED_ORIGINS: str = "http://localhost:3000"
    
    @property
    def allowed_origins_list(self) -> List[str]:
        """Convert comma-separated string to list"""
        return [origin.strip() for origin in self.ALLOWED_ORIGINS.split(",")]
    
    # Database Configuration
    DATABASE_URL: str = "postgresql+asyncpg://aiverse_user:aiverse_pass@localhost:5432/aiverse_db"
    DATABASE_ECHO: bool = False  # Set to True to see SQL queries in logs
    
    # Database Connection Pool Settings
    DB_POOL_SIZE: int = 5
    DB_MAX_OVERFLOW: int = 10
    DB_POOL_TIMEOUT: int = 30
    DB_POOL_RECYCLE: int = 3600  # Recycle connections after 1 hour
    
    @property
    def async_database_url(self) -> str:
        """Get async database URL"""
        return self.DATABASE_URL
    
    @property
    def sync_database_url(self) -> str:
        """Get sync database URL (for Alembic migrations)"""
        return self.DATABASE_URL.replace("+asyncpg", "").replace("postgresql+asyncpg", "postgresql")
    
    # AI Settings
    OLLAMA_BASE_URL: str = "http://localhost:11434"
    DEFAULT_AI_MODEL: str = "llama2"
    
    # Security
    SECRET_KEY: str = "change-this-in-production"
    ALGORITHM: str = "HS256"
    ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
    
    # Pydantic Settings Configuration
    model_config = SettingsConfigDict(
        env_file=".env",
        env_file_encoding="utf-8",
        case_sensitive=True,
        extra="ignore"
    )


@lru_cache()
def get_settings() -> Settings:
    """
    Create settings instance (cached)
    
    @lru_cache ensures we only create one Settings instance
    and reuse it throughout the application
    
    Returns:
        Settings: Application settings
    """
    return Settings()


# Convenience: Get settings instance
settings = get_settings()

Update .env:

# Application Configuration
APP_NAME=AIVerse Backend
APP_VERSION=0.4.0
DEBUG=True
ENVIRONMENT=development

# Server Configuration
API_V1_PREFIX=/api/v1
HOST=0.0.0.0
PORT=8000

# CORS Configuration
ALLOWED_ORIGINS=http://localhost:3000,http://localhost:8000

# Database Configuration
DATABASE_URL=postgresql+asyncpg://aiverse_user:aiverse_pass@localhost:5432/aiverse_db
DATABASE_ECHO=True
DB_POOL_SIZE=5
DB_MAX_OVERFLOW=10
DB_POOL_TIMEOUT=30
DB_POOL_RECYCLE=3600

# AI Configuration
OLLAMA_BASE_URL=http://localhost:11434
DEFAULT_AI_MODEL=llama2

# Security Configuration
SECRET_KEY=your-secret-key-change-in-production
ALGORITHM=HS256
ACCESS_TOKEN_EXPIRE_MINUTES=30

Step 4: Create Database Infrastructure

Create app/db/__init__.py:

"""
Database package

Contains database configuration, models, and utilities
"""

from app.db.base import Base
from app.db.session import engine, async_session_maker, get_db

__all__ = [
    "Base",
    "engine",
    "async_session_maker",
    "get_db",
]

Create app/db/base.py:

"""
Base class for all database models

All SQLAlchemy models should inherit from Base
"""

from sqlalchemy.orm import DeclarativeBase
from sqlalchemy import MetaData

# Naming convention for constraints
# This ensures consistent naming across the database
NAMING_CONVENTION = {
    "ix": "ix_%(column_0_label)s",
    "uq": "uq_%(table_name)s_%(column_0_name)s",
    "ck": "ck_%(table_name)s_%(constraint_name)s",
    "fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s",
    "pk": "pk_%(table_name)s"
}

metadata = MetaData(naming_convention=NAMING_CONVENTION)


class Base(DeclarativeBase):
    """
    Base class for all database models
    
    All models should inherit from this class
    """
    metadata = metadata

Create app/db/session.py:

"""
Database session management

Handles database connection, session creation, and dependency injection
"""

from sqlalchemy.ext.asyncio import (
    create_async_engine,
    async_sessionmaker,
    AsyncSession,
    AsyncEngine
)
from typing import AsyncGenerator
from app.core.config import settings


# Create async engine
engine: AsyncEngine = create_async_engine(
    settings.async_database_url,
    echo=settings.DATABASE_ECHO,
    pool_size=settings.DB_POOL_SIZE,
    max_overflow=settings.DB_MAX_OVERFLOW,
    pool_timeout=settings.DB_POOL_TIMEOUT,
    pool_recycle=settings.DB_POOL_RECYCLE,
    pool_pre_ping=True,  # Verify connections before using
)

# Create async session factory
async_session_maker = async_sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False,
    autocommit=False,
    autoflush=False,
)


async def get_db() -> AsyncGenerator[AsyncSession, None]:
    """
    Dependency for getting database session
    
    Yields:
        AsyncSession: Database session
    
    Usage:
        @app.get("/users")
        async def get_users(db: AsyncSession = Depends(get_db)):
            result = await db.execute(select(User))
            return result.scalars().all()
    """
    async with async_session_maker() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise
        finally:
            await session.close()


async def init_db() -> None:
    """
    Initialize database
    
    Creates all tables if they don't exist
    Note: In production, use Alembic migrations instead
    """
    from app.db.base import Base
    
    async with engine.begin() as conn:
        # Drop all tables (only for development!)
        # await conn.run_sync(Base.metadata.drop_all)
        
        # Create all tables
        await conn.run_sync(Base.metadata.create_all)


async def close_db() -> None:
    """
    Close database connections
    
    Call this on application shutdown
    """
    await engine.dispose()

Step 5: Create Database Models

Create app/db/models/__init__.py:

"""
Database models package

Contains all SQLAlchemy ORM models
"""

from app.db.models.user import User
from app.db.models.conversation import Conversation
from app.db.models.message import Message

__all__ = [
    "User",
    "Conversation",
    "Message",
]

Create app/db/models/user.py:

"""
User database model

SQLAlchemy ORM model for users
"""

from sqlalchemy import String, Boolean, DateTime, Enum as SQLEnum
from sqlalchemy.orm import Mapped, mapped_column, relationship
from datetime import datetime
from typing import List
import enum

from app.db.base import Base


class UserRole(str, enum.Enum):
    """User role enumeration"""
    ADMIN = "admin"
    USER = "user"
    GUEST = "guest"


class User(Base):
    """
    User model
    
    Represents a user in the system
    """
    __tablename__ = "users"
    
    # Primary Key
    id: Mapped[int] = mapped_column(primary_key=True, index=True)
    
    # User Information
    username: Mapped[str] = mapped_column(
        String(50),
        unique=True,
        index=True,
        nullable=False
    )
    email: Mapped[str] = mapped_column(
        String(255),
        unique=True,
        index=True,
        nullable=False
    )
    hashed_password: Mapped[str] = mapped_column(
        String(255),
        nullable=False
    )
    full_name: Mapped[str | None] = mapped_column(
        String(100),
        nullable=True
    )
    
    # Role and Status
    role: Mapped[UserRole] = mapped_column(
        SQLEnum(UserRole),
        default=UserRole.USER,
        nullable=False
    )
    is_active: Mapped[bool] = mapped_column(
        Boolean,
        default=True,
        nullable=False
    )
    is_verified: Mapped[bool] = mapped_column(
        Boolean,
        default=False,
        nullable=False
    )
    
    # Timestamps
    created_at: Mapped[datetime] = mapped_column(
        DateTime,
        default=datetime.utcnow,
        nullable=False
    )
    updated_at: Mapped[datetime] = mapped_column(
        DateTime,
        default=datetime.utcnow,
        onupdate=datetime.utcnow,
        nullable=False
    )
    last_login: Mapped[datetime | None] = mapped_column(
        DateTime,
        nullable=True
    )
    
    # Relationships
    conversations: Mapped[List["Conversation"]] = relationship(
        "Conversation",
        back_populates="user",
        cascade="all, delete-orphan"
    )
    
    def __repr__(self) -> str:
        return f"<User(id={self.id}, username='{self.username}', email='{self.email}')>"

Create app/db/models/conversation.py:

"""
Conversation database model

SQLAlchemy ORM model for AI conversations
"""

from sqlalchemy import String, DateTime, ForeignKey, Text
from sqlalchemy.orm import Mapped, mapped_column, relationship
from datetime import datetime
from typing import List

from app.db.base import Base


class Conversation(Base):
    """
    Conversation model
    
    Represents an AI conversation thread
    """
    __tablename__ = "conversations"
    
    # Primary Key
    id: Mapped[int] = mapped_column(primary_key=True, index=True)
    
    # Foreign Keys
    user_id: Mapped[int] = mapped_column(
        ForeignKey("users.id", ondelete="CASCADE"),
        nullable=False,
        index=True
    )
    
    # Conversation Information
    title: Mapped[str] = mapped_column(
        String(255),
        nullable=False,
        default="New Conversation"
    )
    model_name: Mapped[str] = mapped_column(
        String(50),
        nullable=False
    )
    
    # Metadata
    metadata_json: Mapped[str | None] = mapped_column(
        Text,
        nullable=True
    )
    
    # Timestamps
    created_at: Mapped[datetime] = mapped_column(
        DateTime,
        default=datetime.utcnow,
        nullable=False
    )
    updated_at: Mapped[datetime] = mapped_column(
        DateTime,
        default=datetime.utcnow,
        onupdate=datetime.utcnow,
        nullable=False
    )
    
    # Relationships
    user: Mapped["User"] = relationship(
        "User",
        back_populates="conversations"
    )
    messages: Mapped[List["Message"]] = relationship(
        "Message",
        back_populates="conversation",
        cascade="all, delete-orphan",
        order_by="Message.created_at"
    )
    
    def __repr__(self) -> str:
        return f"<Conversation(id={self.id}, title='{self.title}', user_id={self.user_id})>"

Create app/db/models/message.py:

"""
Message database model

SQLAlchemy ORM model for conversation messages
"""

from sqlalchemy import String, DateTime, ForeignKey, Text, Enum as SQLEnum
from sqlalchemy.orm import Mapped, mapped_column, relationship
from datetime import datetime
import enum

from app.db.base import Base


class MessageRole(str, enum.Enum):
    """Message role enumeration"""
    USER = "user"
    ASSISTANT = "assistant"
    SYSTEM = "system"


class Message(Base):
    """
    Message model
    
    Represents a single message in a conversation
    """
    __tablename__ = "messages"
    
    # Primary Key
    id: Mapped[int] = mapped_column(primary_key=True, index=True)
    
    # Foreign Keys
    conversation_id: Mapped[int] = mapped_column(
        ForeignKey("conversations.id", ondelete="CASCADE"),
        nullable=False,
        index=True
    )
    
    # Message Information
    role: Mapped[MessageRole] = mapped_column(
        SQLEnum(MessageRole),
        nullable=False
    )
    content: Mapped[str] = mapped_column(
        Text,
        nullable=False
    )
    
    # Token Information (optional)
    prompt_tokens: Mapped[int | None] = mapped_column(nullable=True)
    completion_tokens: Mapped[int | None] = mapped_column(nullable=True)
    total_tokens: Mapped[int | None] = mapped_column(nullable=True)
    
    # Timestamps
    created_at: Mapped[datetime] = mapped_column(
        DateTime,
        default=datetime.utcnow,
        nullable=False,
        index=True
    )
    
    # Relationships
    conversation: Mapped["Conversation"] = relationship(
        "Conversation",
        back_populates="messages"
    )
    
    def __repr__(self) -> str:
        content_preview = self.content[:50] + "..." if len(self.content) > 50 else self.content
        return f"<Message(id={self.id}, role='{self.role}', content='{content_preview}')>"

Step 6: Setup Alembic for Migrations

Initialize Alembic:

# From fastapi/ directory
alembic init alembic

This creates:

fastapi/
โ”œโ”€โ”€ alembic/
โ”‚   โ”œโ”€โ”€ versions/     # Migration files
โ”‚   โ”œโ”€โ”€ env.py        # Alembic environment
โ”‚   โ”œโ”€โ”€ script.py.mako
โ”‚   โ””โ”€โ”€ README
โ””โ”€โ”€ alembic.ini       # Alembic configuration

Update alembic.ini:

# alembic.ini
[alembic]
script_location = alembic
prepend_sys_path = .
version_path_separator = os

# Disable this - we'll set it programmatically
# sqlalchemy.url = postgresql://user:pass@localhost/dbname

[loggers]
keys = root,sqlalchemy,alembic

[handlers]
keys = console

[formatters]
keys = generic

[logger_root]
level = WARN
handlers = console
qualname =

[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine

[logger_alembic]
level = INFO
handlers =
qualname = alembic

[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic

[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

Update alembic/env.py:

"""
Alembic environment configuration

Handles database migrations
"""

from logging.config import fileConfig
from sqlalchemy import pool
from sqlalchemy.engine import Connection
from sqlalchemy.ext.asyncio import async_engine_from_config
from alembic import context
import asyncio

# Import your models and config
from app.core.config import settings
from app.db.base import Base
# Import all models so Alembic can detect them
from app.db.models import User, Conversation, Message

# this is the Alembic Config object
config = context.config

# Set database URL from settings
config.set_main_option("sqlalchemy.url", settings.sync_database_url)

# Interpret the config file for Python logging
if config.config_file_name is not None:
    fileConfig(config.config_file_name)

# Set target metadata for autogenerate
target_metadata = Base.metadata


def run_migrations_offline() -> None:
    """
    Run migrations in 'offline' mode
    
    This configures the context with just a URL
    and not an Engine, though an Engine is acceptable
    here as well. By skipping the Engine creation
    we don't even need a DBAPI to be available.
    """
    url = config.get_main_option("sqlalchemy.url")
    context.configure(
        url=url,
        target_metadata=target_metadata,
        literal_binds=True,
        dialect_opts={"paramstyle": "named"},
        compare_type=True,
    )

    with context.begin_transaction():
        context.run_migrations()


def do_run_migrations(connection: Connection) -> None:
    """Run migrations with provided connection"""
    context.configure(
        connection=connection,
        target_metadata=target_metadata,
        compare_type=True,
    )

    with context.begin_transaction():
        context.run_migrations()


async def run_async_migrations() -> None:
    """Run migrations in 'online' mode with async engine"""
    connectable = async_engine_from_config(
        config.get_section(config.config_ini_section, {}),
        prefix="sqlalchemy.",
        poolclass=pool.NullPool,
    )

    async with connectable.connect() as connection:
        await connection.run_sync(do_run_migrations)

    await connectable.dispose()


def run_migrations_online() -> None:
    """
    Run migrations in 'online' mode
    
    In this scenario we need to create an Engine
    and associate a connection with the context.
    """
    asyncio.run(run_async_migrations())


if context.is_offline_mode():
    run_migrations_offline()
else:
    run_migrations_online()

Step 7: Create Initial Migration

# Create initial migration
alembic revision --autogenerate -m "Initial migration: users, conversations, messages"

# Apply migration
alembic upgrade head

You should see:

INFO  [alembic.runtime.migration] Running upgrade  -> abc123, Initial migration: users, conversations, messages

Verify tables were created:

# Using Docker
docker exec -it aiverse_postgres psql -U aiverse_user -d aiverse_db -c "\dt"

# Should show:
#  public | alembic_version | table | aiverse_user
#  public | conversations   | table | aiverse_user
#  public | messages        | table | aiverse_user
#  public | users           | table | aiverse_user

Leave a Reply

Your email address will not be published. Required fields are marked *

Search