Ep.08 Database Integration with PostgreSQL & SQLAlchemy in FastAPI

Views: 16

To integrate PostgreSQL with FastAPI using SQLAlchemy, install SQLAlchemy and asyncpg for async support, create database models inheriting from SQLAlchemy’s declarative base, set up an async engine and session factory, use Alembic for database migrations, and implement dependency injection to provide database sessions to your endpoints. Replace in-memory storage in your service layer with SQLAlchemy queries, use async/await patterns for all database operations, and implement proper connection pooling. This architecture ensures scalable, production-ready database integration with full ORM capabilities.

🎓 What You’ll Learn

By the end of this tutorial, you’ll be able to:

  • Set up PostgreSQL database for FastAPI
  • Create SQLAlchemy ORM models with relationships
  • Implement async database operations
  • Use Alembic for database migrations
  • Handle database sessions with dependency injection
  • Implement proper connection pooling
  • Create database indexes and constraints
  • Handle transactions and rollbacks
  • Implement database seeding and fixtures
  • Deploy database in production

📖 Understanding Database Integration

Why PostgreSQL + SQLAlchemy?

TechnologyWhy We Use It
PostgreSQLProduction-grade, ACID compliant, supports JSON, full-text search
SQLAlchemyMature ORM, async support, migrations, relationship handling
AlembicDatabase migrations, version control for schema changes
asyncpgFastest PostgreSQL driver for Python async

Architecture Overview

FastAPI Endpoint
    ↓
Dependency Injection (get_db)
    ↓
SQLAlchemy Session
    ↓
ORM Models (User, Conversation, Message)
    ↓
PostgreSQL Database

🛠️ Step-by-Step Implementation

Step 1: Install Dependencies

# Activate your virtual environment
cd fastapi
source venv/bin/activate  # or venv\Scripts\activate on Windows

# Install database packages
pip install sqlalchemy[asyncio]==2.0.23
pip install asyncpg==0.29.0
pip install alembic==1.13.1
pip install psycopg2-binary==2.9.9  # For sync operations if needed

# Update requirements
pip freeze > requirements.txt

What we installed:

  • sqlalchemy[asyncio]: ORM with async support
  • asyncpg: Async PostgreSQL driver
  • alembic: Database migration tool
  • psycopg2-binary: PostgreSQL adapter (backup/utilities)

Step 2: Install and Setup PostgreSQL

Option A: Local Installation

Windows:

# Download from https://www.postgresql.org/download/windows/
# Or use chocolatey:
choco install postgresql

macOS:

brew install postgresql@15
brew services start postgresql@15

Linux (Ubuntu/Debian):

sudo apt update
sudo apt install postgresql postgresql-contrib
sudo systemctl start postgresql

Option B: Docker (Recommended for Development)

Create docker-compose.yml in project root:

version: '3.8'

services:
  postgres:
    image: postgres:15-alpine
    container_name: aiverse_postgres
    environment:
      POSTGRES_USER: aiverse_user
      POSTGRES_PASSWORD: aiverse_pass
      POSTGRES_DB: aiverse_db
    ports:
      - "5432:5432"
    volumes:
      - postgres_data:/var/lib/postgresql/data
    healthcheck:
      test: ["CMD-SHELL", "pg_isready -U aiverse_user -d aiverse_db"]
      interval: 10s
      timeout: 5s
      retries: 5

volumes:
  postgres_data:

Start PostgreSQL:

docker-compose up -d

Verify it’s running:

docker ps
# You should see aiverse_postgres running

Step 3: Update Configuration

Update app/core/config.py:

"""
Core configuration module
Manages all application settings using Pydantic Settings
"""

from pydantic_settings import BaseSettings, SettingsConfigDict
from typing import List, Optional
from functools import lru_cache


class Settings(BaseSettings):
    """
    Application settings
    
    These values are loaded from environment variables or .env file
    Pydantic validates the types automatically
    """
    
    # Application
    APP_NAME: str = "AIVerse Backend"
    APP_VERSION: str = "0.4.0"
    DEBUG: bool = False
    ENVIRONMENT: str = "production"
    
    # API
    API_V1_PREFIX: str = "/api/v1"
    HOST: str = "0.0.0.0"
    PORT: int = 8000
    
    # CORS
    ALLOWED_ORIGINS: str = "http://localhost:3000"
    
    @property
    def allowed_origins_list(self) -> List[str]:
        """Convert comma-separated string to list"""
        return [origin.strip() for origin in self.ALLOWED_ORIGINS.split(",")]
    
    # Database Configuration
    DATABASE_URL: str = "postgresql+asyncpg://aiverse_user:aiverse_pass@localhost:5432/aiverse_db"
    DATABASE_ECHO: bool = False  # Set to True to see SQL queries in logs
    
    # Database Connection Pool Settings
    DB_POOL_SIZE: int = 5
    DB_MAX_OVERFLOW: int = 10
    DB_POOL_TIMEOUT: int = 30
    DB_POOL_RECYCLE: int = 3600  # Recycle connections after 1 hour
    
    @property
    def async_database_url(self) -> str:
        """Get async database URL"""
        return self.DATABASE_URL
    
    @property
    def sync_database_url(self) -> str:
        """Get sync database URL (for Alembic migrations)"""
        return self.DATABASE_URL.replace("+asyncpg", "").replace("postgresql+asyncpg", "postgresql")
    
    # AI Settings
    OLLAMA_BASE_URL: str = "http://localhost:11434"
    DEFAULT_AI_MODEL: str = "llama2"
    
    # Security
    SECRET_KEY: str = "change-this-in-production"
    ALGORITHM: str = "HS256"
    ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
    
    # Pydantic Settings Configuration
    model_config = SettingsConfigDict(
        env_file=".env",
        env_file_encoding="utf-8",
        case_sensitive=True,
        extra="ignore"
    )


@lru_cache()
def get_settings() -> Settings:
    """
    Create settings instance (cached)
    
    @lru_cache ensures we only create one Settings instance
    and reuse it throughout the application
    
    Returns:
        Settings: Application settings
    """
    return Settings()


# Convenience: Get settings instance
settings = get_settings()

Update .env:

# Application Configuration
APP_NAME=AIVerse Backend
APP_VERSION=0.4.0
DEBUG=True
ENVIRONMENT=development

# Server Configuration
API_V1_PREFIX=/api/v1
HOST=0.0.0.0
PORT=8000

# CORS Configuration
ALLOWED_ORIGINS=http://localhost:3000,http://localhost:8000

# Database Configuration
DATABASE_URL=postgresql+asyncpg://aiverse_user:aiverse_pass@localhost:5432/aiverse_db
DATABASE_ECHO=True
DB_POOL_SIZE=5
DB_MAX_OVERFLOW=10
DB_POOL_TIMEOUT=30
DB_POOL_RECYCLE=3600

# AI Configuration
OLLAMA_BASE_URL=http://localhost:11434
DEFAULT_AI_MODEL=llama2

# Security Configuration
SECRET_KEY=your-secret-key-change-in-production
ALGORITHM=HS256
ACCESS_TOKEN_EXPIRE_MINUTES=30

Step 4: Create Database Infrastructure

Create app/db/__init__.py:

"""
Database package

Contains database configuration, models, and utilities
"""

from app.db.base import Base
from app.db.session import engine, async_session_maker, get_db

__all__ = [
    "Base",
    "engine",
    "async_session_maker",
    "get_db",
]

Create app/db/base.py:

"""
Base class for all database models

All SQLAlchemy models should inherit from Base
"""

from sqlalchemy.orm import DeclarativeBase
from sqlalchemy import MetaData

# Naming convention for constraints
# This ensures consistent naming across the database
NAMING_CONVENTION = {
    "ix": "ix_%(column_0_label)s",
    "uq": "uq_%(table_name)s_%(column_0_name)s",
    "ck": "ck_%(table_name)s_%(constraint_name)s",
    "fk": "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s",
    "pk": "pk_%(table_name)s"
}

metadata = MetaData(naming_convention=NAMING_CONVENTION)


class Base(DeclarativeBase):
    """
    Base class for all database models
    
    All models should inherit from this class
    """
    metadata = metadata

Create app/db/session.py:

"""
Database session management

Handles database connection, session creation, and dependency injection
"""

from sqlalchemy.ext.asyncio import (
    create_async_engine,
    async_sessionmaker,
    AsyncSession,
    AsyncEngine
)
from typing import AsyncGenerator
from app.core.config import settings


# Create async engine
engine: AsyncEngine = create_async_engine(
    settings.async_database_url,
    echo=settings.DATABASE_ECHO,
    pool_size=settings.DB_POOL_SIZE,
    max_overflow=settings.DB_MAX_OVERFLOW,
    pool_timeout=settings.DB_POOL_TIMEOUT,
    pool_recycle=settings.DB_POOL_RECYCLE,
    pool_pre_ping=True,  # Verify connections before using
)

# Create async session factory
async_session_maker = async_sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False,
    autocommit=False,
    autoflush=False,
)


async def get_db() -> AsyncGenerator[AsyncSession, None]:
    """
    Dependency for getting database session
    
    Yields:
        AsyncSession: Database session
    
    Usage:
        @app.get("/users")
        async def get_users(db: AsyncSession = Depends(get_db)):
            result = await db.execute(select(User))
            return result.scalars().all()
    """
    async with async_session_maker() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise
        finally:
            await session.close()


async def init_db() -> None:
    """
    Initialize database
    
    Creates all tables if they don't exist
    Note: In production, use Alembic migrations instead
    """
    from app.db.base import Base
    
    async with engine.begin() as conn:
        # Drop all tables (only for development!)
        # await conn.run_sync(Base.metadata.drop_all)
        
        # Create all tables
        await conn.run_sync(Base.metadata.create_all)


async def close_db() -> None:
    """
    Close database connections
    
    Call this on application shutdown
    """
    await engine.dispose()

Step 5: Create Database Models

Create app/db/models/__init__.py:

"""
Database models package

Contains all SQLAlchemy ORM models
"""

from app.db.models.user import User
from app.db.models.conversation import Conversation
from app.db.models.message import Message

__all__ = [
    "User",
    "Conversation",
    "Message",
]

Create app/db/models/user.py:

"""
User database model

SQLAlchemy ORM model for users
"""

from sqlalchemy import String, Boolean, DateTime, Enum as SQLEnum
from sqlalchemy.orm import Mapped, mapped_column, relationship
from datetime import datetime
from typing import List
import enum

from app.db.base import Base


class UserRole(str, enum.Enum):
    """User role enumeration"""
    ADMIN = "admin"
    USER = "user"
    GUEST = "guest"


class User(Base):
    """
    User model
    
    Represents a user in the system
    """
    __tablename__ = "users"
    
    # Primary Key
    id: Mapped[int] = mapped_column(primary_key=True, index=True)
    
    # User Information
    username: Mapped[str] = mapped_column(
        String(50),
        unique=True,
        index=True,
        nullable=False
    )
    email: Mapped[str] = mapped_column(
        String(255),
        unique=True,
        index=True,
        nullable=False
    )
    hashed_password: Mapped[str] = mapped_column(
        String(255),
        nullable=False
    )
    full_name: Mapped[str | None] = mapped_column(
        String(100),
        nullable=True
    )
    
    # Role and Status
    role: Mapped[UserRole] = mapped_column(
        SQLEnum(UserRole),
        default=UserRole.USER,
        nullable=False
    )
    is_active: Mapped[bool] = mapped_column(
        Boolean,
        default=True,
        nullable=False
    )
    is_verified: Mapped[bool] = mapped_column(
        Boolean,
        default=False,
        nullable=False
    )
    
    # Timestamps
    created_at: Mapped[datetime] = mapped_column(
        DateTime,
        default=datetime.utcnow,
        nullable=False
    )
    updated_at: Mapped[datetime] = mapped_column(
        DateTime,
        default=datetime.utcnow,
        onupdate=datetime.utcnow,
        nullable=False
    )
    last_login: Mapped[datetime | None] = mapped_column(
        DateTime,
        nullable=True
    )
    
    # Relationships
    conversations: Mapped[List["Conversation"]] = relationship(
        "Conversation",
        back_populates="user",
        cascade="all, delete-orphan"
    )
    
    def __repr__(self) -> str:
        return f"<User(id={self.id}, username='{self.username}', email='{self.email}')>"

Create app/db/models/conversation.py:

"""
Conversation database model

SQLAlchemy ORM model for AI conversations
"""

from sqlalchemy import String, DateTime, ForeignKey, Text
from sqlalchemy.orm import Mapped, mapped_column, relationship
from datetime import datetime
from typing import List

from app.db.base import Base


class Conversation(Base):
    """
    Conversation model
    
    Represents an AI conversation thread
    """
    __tablename__ = "conversations"
    
    # Primary Key
    id: Mapped[int] = mapped_column(primary_key=True, index=True)
    
    # Foreign Keys
    user_id: Mapped[int] = mapped_column(
        ForeignKey("users.id", ondelete="CASCADE"),
        nullable=False,
        index=True
    )
    
    # Conversation Information
    title: Mapped[str] = mapped_column(
        String(255),
        nullable=False,
        default="New Conversation"
    )
    model_name: Mapped[str] = mapped_column(
        String(50),
        nullable=False
    )
    
    # Metadata
    metadata_json: Mapped[str | None] = mapped_column(
        Text,
        nullable=True
    )
    
    # Timestamps
    created_at: Mapped[datetime] = mapped_column(
        DateTime,
        default=datetime.utcnow,
        nullable=False
    )
    updated_at: Mapped[datetime] = mapped_column(
        DateTime,
        default=datetime.utcnow,
        onupdate=datetime.utcnow,
        nullable=False
    )
    
    # Relationships
    user: Mapped["User"] = relationship(
        "User",
        back_populates="conversations"
    )
    messages: Mapped[List["Message"]] = relationship(
        "Message",
        back_populates="conversation",
        cascade="all, delete-orphan",
        order_by="Message.created_at"
    )
    
    def __repr__(self) -> str:
        return f"<Conversation(id={self.id}, title='{self.title}', user_id={self.user_id})>"

Create app/db/models/message.py:

"""
Message database model

SQLAlchemy ORM model for conversation messages
"""

from sqlalchemy import String, DateTime, ForeignKey, Text, Enum as SQLEnum
from sqlalchemy.orm import Mapped, mapped_column, relationship
from datetime import datetime
import enum

from app.db.base import Base


class MessageRole(str, enum.Enum):
    """Message role enumeration"""
    USER = "user"
    ASSISTANT = "assistant"
    SYSTEM = "system"


class Message(Base):
    """
    Message model
    
    Represents a single message in a conversation
    """
    __tablename__ = "messages"
    
    # Primary Key
    id: Mapped[int] = mapped_column(primary_key=True, index=True)
    
    # Foreign Keys
    conversation_id: Mapped[int] = mapped_column(
        ForeignKey("conversations.id", ondelete="CASCADE"),
        nullable=False,
        index=True
    )
    
    # Message Information
    role: Mapped[MessageRole] = mapped_column(
        SQLEnum(MessageRole),
        nullable=False
    )
    content: Mapped[str] = mapped_column(
        Text,
        nullable=False
    )
    
    # Token Information (optional)
    prompt_tokens: Mapped[int | None] = mapped_column(nullable=True)
    completion_tokens: Mapped[int | None] = mapped_column(nullable=True)
    total_tokens: Mapped[int | None] = mapped_column(nullable=True)
    
    # Timestamps
    created_at: Mapped[datetime] = mapped_column(
        DateTime,
        default=datetime.utcnow,
        nullable=False,
        index=True
    )
    
    # Relationships
    conversation: Mapped["Conversation"] = relationship(
        "Conversation",
        back_populates="messages"
    )
    
    def __repr__(self) -> str:
        content_preview = self.content[:50] + "..." if len(self.content) > 50 else self.content
        return f"<Message(id={self.id}, role='{self.role}', content='{content_preview}')>"

Step 6: Setup Alembic for Migrations

Initialize Alembic:

# From fastapi/ directory
alembic init alembic

This creates:

fastapi/
├── alembic/
│   ├── versions/     # Migration files
│   ├── env.py        # Alembic environment
│   ├── script.py.mako
│   └── README
└── alembic.ini       # Alembic configuration

Update alembic.ini:

# alembic.ini
[alembic]
script_location = alembic
prepend_sys_path = .
version_path_separator = os

# Disable this - we'll set it programmatically
# sqlalchemy.url = postgresql://user:pass@localhost/dbname

[loggers]
keys = root,sqlalchemy,alembic

[handlers]
keys = console

[formatters]
keys = generic

[logger_root]
level = WARN
handlers = console
qualname =

[logger_sqlalchemy]
level = WARN
handlers =
qualname = sqlalchemy.engine

[logger_alembic]
level = INFO
handlers =
qualname = alembic

[handler_console]
class = StreamHandler
args = (sys.stderr,)
level = NOTSET
formatter = generic

[formatter_generic]
format = %(levelname)-5.5s [%(name)s] %(message)s
datefmt = %H:%M:%S

Update alembic/env.py:

"""
Alembic environment configuration

Handles database migrations
"""

from logging.config import fileConfig
from sqlalchemy import pool
from sqlalchemy.engine import Connection
from sqlalchemy.ext.asyncio import async_engine_from_config
from alembic import context
import asyncio

# Import your models and config
from app.core.config import settings
from app.db.base import Base
# Import all models so Alembic can detect them
from app.db.models import User, Conversation, Message

# this is the Alembic Config object
config = context.config

# Set database URL from settings
config.set_main_option("sqlalchemy.url", settings.sync_database_url)

# Interpret the config file for Python logging
if config.config_file_name is not None:
    fileConfig(config.config_file_name)

# Set target metadata for autogenerate
target_metadata = Base.metadata


def run_migrations_offline() -> None:
    """
    Run migrations in 'offline' mode
    
    This configures the context with just a URL
    and not an Engine, though an Engine is acceptable
    here as well. By skipping the Engine creation
    we don't even need a DBAPI to be available.
    """
    url = config.get_main_option("sqlalchemy.url")
    context.configure(
        url=url,
        target_metadata=target_metadata,
        literal_binds=True,
        dialect_opts={"paramstyle": "named"},
        compare_type=True,
    )

    with context.begin_transaction():
        context.run_migrations()


def do_run_migrations(connection: Connection) -> None:
    """Run migrations with provided connection"""
    context.configure(
        connection=connection,
        target_metadata=target_metadata,
        compare_type=True,
    )

    with context.begin_transaction():
        context.run_migrations()


async def run_async_migrations() -> None:
    """Run migrations in 'online' mode with async engine"""
    connectable = async_engine_from_config(
        config.get_section(config.config_ini_section, {}),
        prefix="sqlalchemy.",
        poolclass=pool.NullPool,
    )

    async with connectable.connect() as connection:
        await connection.run_sync(do_run_migrations)

    await connectable.dispose()


def run_migrations_online() -> None:
    """
    Run migrations in 'online' mode
    
    In this scenario we need to create an Engine
    and associate a connection with the context.
    """
    asyncio.run(run_async_migrations())


if context.is_offline_mode():
    run_migrations_offline()
else:
    run_migrations_online()

Step 7: Create Initial Migration

# Create initial migration
alembic revision --autogenerate -m "Initial migration: users, conversations, messages"

# Apply migration
alembic upgrade head

You should see:

INFO  [alembic.runtime.migration] Running upgrade  -> abc123, Initial migration: users, conversations, messages

Verify tables were created:

# Using Docker
docker exec -it aiverse_postgres psql -U aiverse_user -d aiverse_db -c "\dt"

# Should show:
#  public | alembic_version | table | aiverse_user
#  public | conversations   | table | aiverse_user
#  public | messages        | table | aiverse_user
#  public | users           | table | aiverse_user

Or confirm through SQL Shell -> login with credntials -> Run the command \dt

Step 8: Create Repository Pattern

The repository pattern provides an abstraction layer between the business logic and data access layer.

Create app/db/repositories/__init__.py:

"""
Repository package

Contains data access layer implementations
"""

from app.db.repositories.user_repository import UserRepository
from app.db.repositories.conversation_repository import ConversationRepository
from app.db.repositories.message_repository import MessageRepository

__all__ = [
    "UserRepository",
    "ConversationRepository",
    "MessageRepository",
]

Create app/db/repositories/base_repository.py:

"""
Base repository with common CRUD operations

All repositories should inherit from this base class
"""

from typing import Generic, TypeVar, Type, List, Optional, Any
from sqlalchemy import select, update, delete
from sqlalchemy.ext.asyncio import AsyncSession
from app.db.base import Base

ModelType = TypeVar("ModelType", bound=Base)


class BaseRepository(Generic[ModelType]):
    """
    Base repository with common database operations
    
    Provides CRUD operations for any SQLAlchemy model
    """
    
    def __init__(self, model: Type[ModelType], session: AsyncSession):
        """
        Initialize repository
        
        Args:
            model: SQLAlchemy model class
            session: Database session
        """
        self.model = model
        self.session = session
    
    async def create(self, **kwargs) -> ModelType:
        """
        Create a new record
        
        Args:
            **kwargs: Model attributes
        
        Returns:
            Created model instance
        """
        instance = self.model(**kwargs)
        self.session.add(instance)
        await self.session.flush()
        await self.session.refresh(instance)
        return instance
    
    async def get_by_id(self, id: int) -> Optional[ModelType]:
        """
        Get record by ID
        
        Args:
            id: Record ID
        
        Returns:
            Model instance if found, None otherwise
        """
        result = await self.session.execute(
            select(self.model).where(self.model.id == id)
        )
        return result.scalar_one_or_none()
    
    async def get_all(
        self,
        skip: int = 0,
        limit: int = 100,
        **filters
    ) -> List[ModelType]:
        """
        Get all records with optional filtering
        
        Args:
            skip: Number of records to skip
            limit: Maximum number of records to return
            **filters: Field filters
        
        Returns:
            List of model instances
        """
        query = select(self.model)
        
        # Apply filters
        for field, value in filters.items():
            if hasattr(self.model, field) and value is not None:
                query = query.where(getattr(self.model, field) == value)
        
        query = query.offset(skip).limit(limit)
        result = await self.session.execute(query)
        return list(result.scalars().all())
    
    async def update(self, id: int, **kwargs) -> Optional[ModelType]:
        """
        Update record by ID
        
        Args:
            id: Record ID
            **kwargs: Fields to update
        
        Returns:
            Updated model instance if found, None otherwise
        """
        # Remove None values
        update_data = {k: v for k, v in kwargs.items() if v is not None}
        
        if not update_data:
            return await self.get_by_id(id)
        
        await self.session.execute(
            update(self.model)
            .where(self.model.id == id)
            .values(**update_data)
        )
        await self.session.flush()
        
        return await self.get_by_id(id)
    
    async def delete(self, id: int) -> bool:
        """
        Delete record by ID
        
        Args:
            id: Record ID
        
        Returns:
            True if deleted, False if not found
        """
        result = await self.session.execute(
            delete(self.model).where(self.model.id == id)
        )
        await self.session.flush()
        return result.rowcount > 0
    
    async def count(self, **filters) -> int:
        """
        Count records with optional filtering
        
        Args:
            **filters: Field filters
        
        Returns:
            Number of records
        """
        from sqlalchemy import func
        
        query = select(func.count(self.model.id))
        
        # Apply filters
        for field, value in filters.items():
            if hasattr(self.model, field) and value is not None:
                query = query.where(getattr(self.model, field) == value)
        
        result = await self.session.execute(query)
        return result.scalar_one()
    
    async def exists(self, **filters) -> bool:
        """
        Check if record exists
        
        Args:
            **filters: Field filters
        
        Returns:
            True if exists, False otherwise
        """
        count = await self.count(**filters)
        return count > 0

Create app/db/repositories/user_repository.py:

"""
User repository

Data access layer for User model
"""

from typing import Optional
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession

from app.db.repositories.base_repository import BaseRepository
from app.db.models.user import User, UserRole


class UserRepository(BaseRepository[User]):
    """
    User repository
    
    Handles all database operations for users
    """
    
    def __init__(self, session: AsyncSession):
        super().__init__(User, session)
    
    async def get_by_username(self, username: str) -> Optional[User]:
        """
        Get user by username
        
        Args:
            username: Username to search for
        
        Returns:
            User if found, None otherwise
        """
        result = await self.session.execute(
            select(User).where(User.username == username)
        )
        return result.scalar_one_or_none()
    
    async def get_by_email(self, email: str) -> Optional[User]:
        """
        Get user by email
        
        Args:
            email: Email to search for
        
        Returns:
            User if found, None otherwise
        """
        result = await self.session.execute(
            select(User).where(User.email == email)
        )
        return result.scalar_one_or_none()
    
    async def get_by_role(
        self,
        role: UserRole,
        skip: int = 0,
        limit: int = 100
    ) -> list[User]:
        """
        Get users by role
        
        Args:
            role: User role to filter by
            skip: Number of records to skip
            limit: Maximum number of records
        
        Returns:
            List of users with specified role
        """
        result = await self.session.execute(
            select(User)
            .where(User.role == role)
            .offset(skip)
            .limit(limit)
        )
        return list(result.scalars().all())
    
    async def get_active_users(
        self,
        skip: int = 0,
        limit: int = 100
    ) -> list[User]:
        """
        Get active users
        
        Args:
            skip: Number of records to skip
            limit: Maximum number of records
        
        Returns:
            List of active users
        """
        result = await self.session.execute(
            select(User)
            .where(User.is_active == True)
            .offset(skip)
            .limit(limit)
        )
        return list(result.scalars().all())
    
    async def update_last_login(self, user_id: int) -> Optional[User]:
        """
        Update user's last login timestamp
        
        Args:
            user_id: User ID
        
        Returns:
            Updated user
        """
        from datetime import datetime
        return await self.update(user_id, last_login=datetime.utcnow())

Create app/db/repositories/conversation_repository.py:

"""
Conversation repository

Data access layer for Conversation model
"""

from typing import Optional
from sqlalchemy import select
from sqlalchemy.orm import selectinload
from sqlalchemy.ext.asyncio import AsyncSession

from app.db.repositories.base_repository import BaseRepository
from app.db.models.conversation import Conversation


class ConversationRepository(BaseRepository[Conversation]):
    """
    Conversation repository
    
    Handles all database operations for conversations
    """
    
    def __init__(self, session: AsyncSession):
        super().__init__(Conversation, session)
    
    async def get_by_user(
        self,
        user_id: int,
        skip: int = 0,
        limit: int = 100
    ) -> list[Conversation]:
        """
        Get conversations by user ID
        
        Args:
            user_id: User ID
            skip: Number of records to skip
            limit: Maximum number of records
        
        Returns:
            List of user's conversations
        """
        result = await self.session.execute(
            select(Conversation)
            .where(Conversation.user_id == user_id)
            .order_by(Conversation.updated_at.desc())
            .offset(skip)
            .limit(limit)
        )
        return list(result.scalars().all())
    
    async def get_with_messages(self, conversation_id: int) -> Optional[Conversation]:
        """
        Get conversation with all messages loaded
        
        Args:
            conversation_id: Conversation ID
        
        Returns:
            Conversation with messages if found, None otherwise
        """
        result = await self.session.execute(
            select(Conversation)
            .options(selectinload(Conversation.messages))
            .where(Conversation.id == conversation_id)
        )
        return result.scalar_one_or_none()
    
    async def get_user_conversation_count(self, user_id: int) -> int:
        """
        Get number of conversations for a user
        
        Args:
            user_id: User ID
        
        Returns:
            Number of conversations
        """
        return await self.count(user_id=user_id)

Create app/db/repositories/message_repository.py:

"""
Message repository

Data access layer for Message model
"""

from typing import Optional
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession

from app.db.repositories.base_repository import BaseRepository
from app.db.models.message import Message, MessageRole


class MessageRepository(BaseRepository[Message]):
    """
    Message repository
    
    Handles all database operations for messages
    """
    
    def __init__(self, session: AsyncSession):
        super().__init__(Message, session)
    
    async def get_by_conversation(
        self,
        conversation_id: int,
        skip: int = 0,
        limit: int = 100
    ) -> list[Message]:
        """
        Get messages for a conversation
        
        Args:
            conversation_id: Conversation ID
            skip: Number of records to skip
            limit: Maximum number of records
        
        Returns:
            List of messages ordered by creation time
        """
        result = await self.session.execute(
            select(Message)
            .where(Message.conversation_id == conversation_id)
            .order_by(Message.created_at.asc())
            .offset(skip)
            .limit(limit)
        )
        return list(result.scalars().all())
    
    async def get_last_message(
        self,
        conversation_id: int
    ) -> Optional[Message]:
        """
        Get the last message in a conversation
        
        Args:
            conversation_id: Conversation ID
        
        Returns:
            Last message if found, None otherwise
        """
        result = await self.session.execute(
            select(Message)
            .where(Message.conversation_id == conversation_id)
            .order_by(Message.created_at.desc())
            .limit(1)
        )
        return result.scalar_one_or_none()
    
    async def count_by_role(
        self,
        conversation_id: int,
        role: MessageRole
    ) -> int:
        """
        Count messages by role in a conversation
        
        Args:
            conversation_id: Conversation ID
            role: Message role
        
        Returns:
            Number of messages with specified role
        """
        return await self.count(
            conversation_id=conversation_id,
            role=role
        )

Step 9: Update Services to Use Database

Update app/services/user_service.py:

"""
User service - Business logic for user operations

Updated to use database instead of in-memory storage
"""

from typing import Optional
from sqlalchemy.ext.asyncio import AsyncSession
from passlib.context import CryptContext

from app.db.repositories.user_repository import UserRepository
from app.db.models.user import User, UserRole
from app.models.user import UserCreate, UserUpdate
from app.core.exceptions import UserNotFoundException, UserAlreadyExistsException

# Password hashing
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")


class UserService:
    """
    User service class
    
    Handles all user-related business logic with database operations
    """
    
    def __init__(self, db: AsyncSession):
        """
        Initialize service with database session
        
        Args:
            db: Database session
        """
        self.repository = UserRepository(db)
    
    def _hash_password(self, password: str) -> str:
        """
        Hash password using bcrypt
        
        Args:
            password: Plain text password
        
        Returns:
            Hashed password
        """
        return pwd_context.hash(password)
    
    def _verify_password(self, plain_password: str, hashed_password: str) -> bool:
        """
        Verify password against hash
        
        Args:
            plain_password: Plain text password
            hashed_password: Hashed password
        
        Returns:
            True if password matches, False otherwise
        """
        return pwd_context.verify(plain_password, hashed_password)
    
    async def create_user(self, user_data: UserCreate) -> User:
        """
        Create a new user
        
        Args:
            user_data: User creation data
        
        Returns:
            Created user
        
        Raises:
            UserAlreadyExistsException: If username or email already exists
        """
        # Check for duplicate username
        existing_user = await self.repository.get_by_username(user_data.username)
        if existing_user:
            raise UserAlreadyExistsException("username", user_data.username)
        
        # Check for duplicate email
        existing_email = await self.repository.get_by_email(user_data.email)
        if existing_email:
            raise UserAlreadyExistsException("email", user_data.email)
        
        # Hash password
        hashed_password = self._hash_password(user_data.password)
        
        # Create user
        user = await self.repository.create(
            username=user_data.username,
            email=user_data.email,
            hashed_password=hashed_password,
            full_name=user_data.full_name,
            role=user_data.role,
            is_active=True,
            is_verified=False
        )
        
        return user
    
    async def get_all_users(
        self,
        skip: int = 0,
        limit: int = 10,
        role: Optional[UserRole] = None
    ) -> list[User]:
        """
        Get all users with optional filtering
        
        Args:
            skip: Number of records to skip
            limit: Maximum number of records
            role: Optional role filter
        
        Returns:
            List of users
        """
        if role:
            return await self.repository.get_by_role(role, skip, limit)
        return await self.repository.get_all(skip, limit)
    
    async def get_user_by_id(self, user_id: int) -> User:
        """
        Get user by ID
        
        Args:
            user_id: User identifier
        
        Returns:
            User object
        
        Raises:
            UserNotFoundException: If user not found
        """
        user = await self.repository.get_by_id(user_id)
        if not user:
            raise UserNotFoundException(user_id)
        return user
    
    async def get_user_by_username(self, username: str) -> Optional[User]:
        """
        Get user by username
        
        Args:
            username: Username
        
        Returns:
            User if found, None otherwise
        """
        return await self.repository.get_by_username(username)
    
    async def get_user_by_email(self, email: str) -> Optional[User]:
        """
        Get user by email
        
        Args:
            email: Email address
        
        Returns:
            User if found, None otherwise
        """
        return await self.repository.get_by_email(email)
    
    async def update_user(self, user_id: int, user_update: UserUpdate) -> User:
        """
        Update user information
        
        Args:
            user_id: User ID
            user_update: Update data
        
        Returns:
            Updated user
        
        Raises:
            UserNotFoundException: If user not found
            UserAlreadyExistsException: If username/email conflict
        """
        # Check if user exists
        user = await self.get_user_by_id(user_id)
        
        update_data = user_update.model_dump(exclude_unset=True)
        
        # Check for username conflict
        if "username" in update_data:
            existing = await self.repository.get_by_username(update_data["username"])
            if existing and existing.id != user_id:
                raise UserAlreadyExistsException("username", update_data["username"])
        
        # Check for email conflict
        if "email" in update_data:
            existing = await self.repository.get_by_email(update_data["email"])
            if existing and existing.id != user_id:
                raise UserAlreadyExistsException("email", update_data["email"])
        
        # Update user
        updated_user = await self.repository.update(user_id, **update_data)
        if not updated_user:
            raise UserNotFoundException(user_id)
        
        return updated_user
    
    async def delete_user(self, user_id: int) -> None:
        """
        Delete a user
        
        Args:
            user_id: User ID
        
        Raises:
            UserNotFoundException: If user not found
        """
        deleted = await self.repository.delete(user_id)
        if not deleted:
            raise UserNotFoundException(user_id)
    
    async def get_user_count(self) -> int:
        """
        Get total number of users
        
        Returns:
            User count
        """
        return await self.repository.count()
    
    async def authenticate_user(
        self,
        username: str,
        password: str
    ) -> Optional[User]:
        """
        Authenticate user with username and password
        
        Args:
            username: Username
            password: Plain text password
        
        Returns:
            User if authentication successful, None otherwise
        """
        user = await self.repository.get_by_username(username)
        if not user:
            return None
        
        if not self._verify_password(password, user.hashed_password):
            return None
        
        # Update last login
        await self.repository.update_last_login(user.id)
        
        return user

Create app/services/conversation_service.py:

"""
Conversation service

Business logic for conversation operations
"""

from typing import Optional
from sqlalchemy.ext.asyncio import AsyncSession

from app.db.repositories.conversation_repository import ConversationRepository
from app.db.repositories.message_repository import MessageRepository
from app.db.models.conversation import Conversation
from app.db.models.message import Message, MessageRole


class ConversationService:
    """
    Conversation service
    
    Handles conversation and message business logic
    """
    
    def __init__(self, db: AsyncSession):
        """
        Initialize service with database session
        
        Args:
            db: Database session
        """
        self.conversation_repo = ConversationRepository(db)
        self.message_repo = MessageRepository(db)
    
    async def create_conversation(
        self,
        user_id: int,
        title: str,
        model_name: str
    ) -> Conversation:
        """
        Create a new conversation
        
        Args:
            user_id: User ID
            title: Conversation title
            model_name: AI model name
        
        Returns:
            Created conversation
        """
        conversation = await self.conversation_repo.create(
            user_id=user_id,
            title=title,
            model_name=model_name
        )
        return conversation
    
    async def get_conversation(
        self,
        conversation_id: int,
        include_messages: bool = False
    ) -> Optional[Conversation]:
        """
        Get conversation by ID
        
        Args:
            conversation_id: Conversation ID
            include_messages: Whether to load messages
        
        Returns:
            Conversation if found, None otherwise
        """
        if include_messages:
            return await self.conversation_repo.get_with_messages(conversation_id)
        return await self.conversation_repo.get_by_id(conversation_id)
    
    async def get_user_conversations(
        self,
        user_id: int,
        skip: int = 0,
        limit: int = 20
    ) -> list[Conversation]:
        """
        Get all conversations for a user
        
        Args:
            user_id: User ID
            skip: Number to skip
            limit: Maximum number
        
        Returns:
            List of conversations
        """
        return await self.conversation_repo.get_by_user(user_id, skip, limit)
    
    async def add_message(
        self,
        conversation_id: int,
        role: MessageRole,
        content: str,
        prompt_tokens: Optional[int] = None,
        completion_tokens: Optional[int] = None,
        total_tokens: Optional[int] = None
    ) -> Message:
        """
        Add message to conversation
        
        Args:
            conversation_id: Conversation ID
            role: Message role
            content: Message content
            prompt_tokens: Prompt tokens used
            completion_tokens: Completion tokens used
            total_tokens: Total tokens used
        
        Returns:
            Created message
        """
        message = await self.message_repo.create(
            conversation_id=conversation_id,
            role=role,
            content=content,
            prompt_tokens=prompt_tokens,
            completion_tokens=completion_tokens,
            total_tokens=total_tokens
        )
        return message
    
    async def get_conversation_messages(
        self,
        conversation_id: int,
        skip: int = 0,
        limit: int = 100
    ) -> list[Message]:
        """
        Get messages for a conversation
        
        Args:
            conversation_id: Conversation ID
            skip: Number to skip
            limit: Maximum number
        
        Returns:
            List of messages
        """
        return await self.message_repo.get_by_conversation(
            conversation_id,
            skip,
            limit
        )
    
    async def delete_conversation(self, conversation_id: int) -> bool:
        """
        Delete a conversation
        
        Args:
            conversation_id: Conversation ID
        
        Returns:
            True if deleted, False if not found
        """
        return await self.conversation_repo.delete(conversation_id)

Step 10: Update API Endpoints

Update app/api/v1/endpoints/users.py:

"""
User management endpoints

Updated to use database
"""

from fastapi import APIRouter, Depends, status
from sqlalchemy.ext.asyncio import AsyncSession
from typing import Annotated

from app.models.user import User, UserCreate, UserUpdate, UserResponse, UserRole
from app.services.user_service import UserService
from app.db.session import get_db

router = APIRouter(prefix="/users", tags=["Users"])


def get_user_service(db: AsyncSession = Depends(get_db)) -> UserService:
    """
    Dependency to get user service with database session
    
    Args:
        db: Database session
    
    Returns:
        UserService instance
    """
    return UserService(db)


@router.post(
    "",
    response_model=UserResponse,
    status_code=status.HTTP_201_CREATED,
    summary="Create a new user"
)
async def create_user(
    user_data: UserCreate,
    service: Annotated[UserService, Depends(get_user_service)]
):
    """
    Create a new user with the following information:
    
    - **username**: Unique username (3-50 characters)
    - **email**: Valid email address
    - **full_name**: Optional full name
    - **password**: Password (min 8 characters)
    - **role**: User role (admin, user, guest)
    """
    user = await service.create_user(user_data)
    return UserResponse.model_validate(user)


@router.get(
    "",
    response_model=list[UserResponse],
    summary="Get all users"
)
async def get_users(
    skip: int = 0,
    limit: int = 10,
    role: UserRole | None = None,
    service: Annotated[UserService, Depends(get_user_service)] = None
):
    """
    Retrieve users with optional filtering:
    
    - **skip**: Number of records to skip (for pagination)
    - **limit**: Maximum number of records to return
    - **role**: Filter by user role
    """
    users = await service.get_all_users(skip=skip, limit=limit, role=role)
    return [UserResponse.model_validate(user) for user in users]


@router.get(
    "/{user_id}",
    response_model=UserResponse,
    summary="Get user by ID"
)
async def get_user(
    user_id: int,
    service: Annotated[UserService, Depends(get_user_service)]
):
    """Get a specific user by their ID"""
    user = await service.get_user_by_id(user_id)
    return UserResponse.model_validate(user)


@router.patch(
    "/{user_id}",
    response_model=UserResponse,
    summary="Update user"
)
async def update_user(
    user_id: int,
    user_update: UserUpdate,
    service: Annotated[UserService, Depends(get_user_service)]
):
    """
    Update user information (partial update)
    
    Only provided fields will be updated
    """
    user = await service.update_user(user_id, user_update)
    return UserResponse.model_validate(user)


@router.delete(
    "/{user_id}",
    status_code=status.HTTP_204_NO_CONTENT,
    summary="Delete user"
)
async def delete_user(
    user_id: int,
    service: Annotated[UserService, Depends(get_user_service)]
):
    """Delete a user by ID"""
    await service.delete_user(user_id)
    return None


@router.get(
    "/stats/count",
    summary="Get user statistics"
)
async def get_user_stats(
    service: Annotated[UserService, Depends(get_user_service)]
):
    """Get user statistics"""
    total = await service.get_user_count()
    return {"total_users": total}

Step 11: Create Conversation Endpoints

Create app/api/v1/endpoints/conversations.py:

"""
Conversation management endpoints

Handles conversation CRUD operations
"""

from fastapi import APIRouter, Depends, status, HTTPException
from sqlalchemy.ext.asyncio import AsyncSession
from typing import Annotated
from pydantic import BaseModel

from app.services.conversation_service import ConversationService
from app.db.session import get_db
from app.db.models.message import MessageRole

router = APIRouter(prefix="/conversations", tags=["Conversations"])


# ============================================
# PYDANTIC MODELS
# ============================================


class ConversationCreate(BaseModel):
    """Request model for creating conversation"""
    user_id: int
    title: str = "New Conversation"
    model_name: str = "llama2"


class MessageCreate(BaseModel):
    """Request model for creating message"""
    role: MessageRole
    content: str
    prompt_tokens: int | None = None
    completion_tokens: int | None = None
    total_tokens: int | None = None


class MessageResponse(BaseModel):
    """Response model for message"""
    id: int
    conversation_id: int
    role: str
    content: str
    prompt_tokens: int | None
    completion_tokens: int | None
    total_tokens: int | None
    created_at: str
    
    model_config = {"from_attributes": True}


class ConversationResponse(BaseModel):
    """Response model for conversation"""
    id: int
    user_id: int
    title: str
    model_name: str
    created_at: str
    updated_at: str
    
    model_config = {"from_attributes": True}


class ConversationWithMessages(ConversationResponse):
    """Response model for conversation with messages"""
    messages: list[MessageResponse] = []


# ============================================
# DEPENDENCIES
# ============================================


def get_conversation_service(
    db: AsyncSession = Depends(get_db)
) -> ConversationService:
    """Get conversation service with database session"""
    return ConversationService(db)


# ============================================
# ENDPOINTS
# ============================================


@router.post(
    "",
    response_model=ConversationResponse,
    status_code=status.HTTP_201_CREATED,
    summary="Create new conversation"
)
async def create_conversation(
    conversation_data: ConversationCreate,
    service: Annotated[ConversationService, Depends(get_conversation_service)]
):
    """
    Create a new conversation
    
    - **user_id**: User ID who owns the conversation
    - **title**: Conversation title
    - **model_name**: AI model to use
    """
    conversation = await service.create_conversation(
        user_id=conversation_data.user_id,
        title=conversation_data.title,
        model_name=conversation_data.model_name
    )
    return ConversationResponse.model_validate(conversation)


@router.get(
    "/{conversation_id}",
    response_model=ConversationWithMessages,
    summary="Get conversation with messages"
)
async def get_conversation(
    conversation_id: int,
    service: Annotated[ConversationService, Depends(get_conversation_service)]
):
    """
    Get conversation by ID with all messages
    
    Returns conversation details and message history
    """
    conversation = await service.get_conversation(
        conversation_id,
        include_messages=True
    )
    
    if not conversation:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f"Conversation {conversation_id} not found"
        )
    
    return ConversationWithMessages.model_validate(conversation)


@router.get(
    "/user/{user_id}",
    response_model=list[ConversationResponse],
    summary="Get user conversations"
)
async def get_user_conversations(
    user_id: int,
    skip: int = 0,
    limit: int = 20,
    service: Annotated[ConversationService, Depends(get_conversation_service)]
):
    """
    Get all conversations for a user
    
    - **user_id**: User ID
    - **skip**: Number of records to skip
    - **limit**: Maximum number of records
    """
    conversations = await service.get_user_conversations(user_id, skip, limit)
    return [ConversationResponse.model_validate(c) for c in conversations]


@router.post(
    "/{conversation_id}/messages",
    response_model=MessageResponse,
    status_code=status.HTTP_201_CREATED,
    summary="Add message to conversation"
)
async def add_message(
    conversation_id: int,
    message_data: MessageCreate,
    service: Annotated[ConversationService, Depends(get_conversation_service)]
):
    """
    Add a message to a conversation
    
    - **role**: Message role (user, assistant, system)
    - **content**: Message content
    - **tokens**: Optional token counts
    """
    message = await service.add_message(
        conversation_id=conversation_id,
        role=message_data.role,
        content=message_data.content,
        prompt_tokens=message_data.prompt_tokens,
        completion_tokens=message_data.completion_tokens,
        total_tokens=message_data.total_tokens
    )
    return MessageResponse.model_validate(message)


@router.get(
    "/{conversation_id}/messages",
    response_model=list[MessageResponse],
    summary="Get conversation messages"
)
async def get_messages(
    conversation_id: int,
    skip: int = 0,
    limit: int = 100,
    service: Annotated[ConversationService, Depends(get_conversation_service)]
):
    """
    Get messages for a conversation
    
    Returns messages in chronological order
    """
    messages = await service.get_conversation_messages(
        conversation_id,
        skip,
        limit
    )
    return [MessageResponse.model_validate(m) for m in messages]


@router.delete(
    "/{conversation_id}",
    status_code=status.HTTP_204_NO_CONTENT,
    summary="Delete conversation"
)
async def delete_conversation(
    conversation_id: int,
    service: Annotated[ConversationService, Depends(get_conversation_service)]
):
    """
    Delete a conversation
    
    This will also delete all associated messages
    """
    deleted = await service.delete_conversation(conversation_id)
    
    if not deleted:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f"Conversation {conversation_id} not found"
        )
    
    return None

Update app/api/v1/api.py to include the new router:

"""
API v1 router aggregator

Combines all v1 endpoint routers
"""

from fastapi import APIRouter
from app.api.v1.endpoints import (
    users,
    health,
    users_advanced,
    dependencies_demo,
    conversations  # New!
)

# Create main v1 router
api_router = APIRouter()

# Include all endpoint routers
api_router.include_router(health.router)
api_router.include_router(users.router)
api_router.include_router(users_advanced.router)
api_router.include_router(dependencies_demo.router)
api_router.include_router(conversations.router)  # New!

# Future routers:
# api_router.include_router(ai.router)

Step 12: Add Password Hashing Dependency

Install passlib for password hashing:

pip install passlib[bcrypt]==1.7.4
pip freeze > requirements.txt

Step 13: Create Database Utilities

Create app/db/utils.py:

"""
Database utilities

Helper functions for database operations
"""

from typing import AsyncGenerator
from sqlalchemy.ext.asyncio import AsyncSession, AsyncEngine
from contextlib import asynccontextmanager

from app.db.session import async_session_maker, engine
from app.db.base import Base
from app.core.config import settings


@asynccontextmanager
async def get_db_context() -> AsyncGenerator[AsyncSession, None]:
    """
    Context manager for database session
    
    Usage:
        async with get_db_context() as db:
            user = await db.execute(select(User))
    """
    async with async_session_maker() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise
        finally:
            await session.close()


async def init_database() -> None:
    """
    Initialize database
    
    Creates all tables if they don't exist
    WARNING: Only use in development. Use Alembic migrations in production.
    """
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
    
    print("✅ Database initialized")


async def drop_database() -> None:
    """
    Drop all database tables
    
    WARNING: This will delete all data!
    Only use in development/testing.
    """
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.drop_all)
    
    print("🗑️  Database dropped")


async def reset_database() -> None:
    """
    Reset database (drop and recreate)
    
    WARNING: This will delete all data!
    Only use in development/testing.
    """
    await drop_database()
    await init_database()
    print("🔄 Database reset complete")


async def check_database_connection() -> bool:
    """
    Check if database connection is working
    
    Returns:
        True if connection successful, False otherwise
    """
    try:
        async with engine.connect() as conn:
            await conn.execute("SELECT 1")
        return True
    except Exception as e:
        print(f"❌ Database connection failed: {e}")
        return False


async def get_database_info() -> dict:
    """
    Get database information
    
    Returns:
        Dictionary with database info
    """
    from sqlalchemy import text
    
    info = {
        "url": settings.DATABASE_URL.replace(settings.SECRET_KEY, "***"),
        "pool_size": settings.DB_POOL_SIZE,
        "max_overflow": settings.DB_MAX_OVERFLOW,
    }
    
    try:
        async with engine.connect() as conn:
            result = await conn.execute(text("SELECT version()"))
            version = result.scalar()
            info["version"] = version
            info["connected"] = True
    except Exception as e:
        info["connected"] = False
        info["error"] = str(e)
    
    return info

Create app/db/seed.py:

"""
Database seeding

Populate database with initial/test data
"""

from sqlalchemy.ext.asyncio import AsyncSession
from passlib.context import CryptContext

from app.db.session import async_session_maker
from app.db.models.user import User, UserRole
from app.db.models.conversation import Conversation
from app.db.models.message import Message, MessageRole
from app.utils.logger import logger

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")


async def seed_users(session: AsyncSession) -> list[User]:
    """
    Seed users
    
    Creates demo users for development/testing
    """
    users = [
        User(
            username="admin",
            email="admin@example.com",
            hashed_password=pwd_context.hash("admin123"),
            full_name="Admin User",
            role=UserRole.ADMIN,
            is_active=True,
            is_verified=True
        ),
        User(
            username="john_doe",
            email="john@example.com",
            hashed_password=pwd_context.hash("password123"),
            full_name="John Doe",
            role=UserRole.USER,
            is_active=True,
            is_verified=True
        ),
        User(
            username="jane_smith",
            email="jane@example.com",
            hashed_password=pwd_context.hash("password123"),
            full_name="Jane Smith",
            role=UserRole.USER,
            is_active=True,
            is_verified=True
        ),
    ]
    
    session.add_all(users)
    await session.flush()
    
    logger.info(f"Seeded {len(users)} users")
    return users


async def seed_conversations(
    session: AsyncSession,
    users: list[User]
) -> list[Conversation]:
    """
    Seed conversations
    
    Creates demo conversations for users
    """
    conversations = [
        Conversation(
            user_id=users[1].id,  # john_doe
            title="Getting Started with FastAPI",
            model_name="llama2"
        ),
        Conversation(
            user_id=users[1].id,  # john_doe
            title="Python Best Practices",
            model_name="llama2"
        ),
        Conversation(
            user_id=users[2].id,  # jane_smith
            title="Database Design Questions",
            model_name="mistral"
        ),
    ]
    
    session.add_all(conversations)
    await session.flush()
    
    logger.info(f"Seeded {len(conversations)} conversations")
    return conversations


async def seed_messages(
    session: AsyncSession,
    conversations: list[Conversation]
) -> list[Message]:
    """
    Seed messages
    
    Creates demo messages for conversations
    """
    messages = [
        # Conversation 1 messages
        Message(
            conversation_id=conversations[0].id,
            role=MessageRole.USER,
            content="What is FastAPI and why should I use it?"
        ),
        Message(
            conversation_id=conversations[0].id,
            role=MessageRole.ASSISTANT,
            content="FastAPI is a modern, fast web framework for building APIs with Python. It's built on Starlette and Pydantic, offering automatic API documentation, data validation, and async support out of the box."
        ),
        Message(
            conversation_id=conversations[0].id,
            role=MessageRole.USER,
            content="How do I create my first endpoint?"
        ),
        Message(
            conversation_id=conversations[0].id,
            role=MessageRole.ASSISTANT,
            content="Here's a simple example:\n\n```python\nfrom fastapi import FastAPI\n\napp = FastAPI()\n\n@app.get('/')\nasync def root():\n    return {'message': 'Hello World'}\n```"
        ),
        
        # Conversation 2 messages
        Message(
            conversation_id=conversations[1].id,
            role=MessageRole.USER,
            content="What are some Python best practices?"
        ),
        Message(
            conversation_id=conversations[1].id,
            role=MessageRole.ASSISTANT,
            content="Key Python best practices include: use type hints, follow PEP 8, write docstrings, use virtual environments, handle exceptions properly, and write tests."
        ),
        
        # Conversation 3 messages
        Message(
            conversation_id=conversations[2].id,
            role=MessageRole.USER,
            content="Should I use ORM or raw SQL?"
        ),
        Message(
            conversation_id=conversations[2].id,
            role=MessageRole.ASSISTANT,
            content="Both have their place. ORMs like SQLAlchemy provide abstraction, type safety, and easier migrations. Raw SQL offers more control and can be more performant for complex queries. For most applications, start with an ORM."
        ),
    ]
    
    session.add_all(messages)
    await session.flush()
    
    logger.info(f"Seeded {len(messages)} messages")
    return messages


async def seed_database() -> None:
    """
    Seed database with all demo data
    
    Creates users, conversations, and messages
    """
    async with async_session_maker() as session:
        try:
            # Seed in order (users first, then conversations, then messages)
            users = await seed_users(session)
            conversations = await seed_conversations(session, users)
            messages = await seed_messages(session, conversations)
            
            await session.commit()
            
            logger.info("✅ Database seeding complete")
            print("✅ Database seeded successfully!")
            print(f"   - {len(users)} users")
            print(f"   - {len(conversations)} conversations")
            print(f"   - {len(messages)} messages")
            
        except Exception as e:
            await session.rollback()
            logger.error(f"Database seeding failed: {e}")
            print(f"❌ Seeding failed: {e}")
            raise


if __name__ == "__main__":
    """Run seeding directly"""
    import asyncio
    asyncio.run(seed_database())

Step 14: Update Main Application

Update app/main.py:

"""
FastAPI AI Backend - Main Application

Updated with database integration
"""

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.exceptions import RequestValidationError
from pydantic import ValidationError
from contextlib import asynccontextmanager

from app.core.config import settings
from app.api.v1.api import api_router
from app.core.exceptions import AppException
from app.core.error_handlers import (
    app_exception_handler,
    validation_exception_handler,
    generic_exception_handler,
)
from app.middleware.logging_middleware import LoggingMiddleware
from app.middleware.performance_middleware import PerformanceMiddleware
from app.utils.logger import logger
from app.db.session import close_db
from app.db.utils import check_database_connection, get_database_info


@asynccontextmanager
async def lifespan(app: FastAPI):
    """
    Application lifespan events
    
    Handles startup and shutdown tasks
    """
    # Startup
    logger.info(
        "Application startup",
        extra={
            "app_name": settings.APP_NAME,
            "version": settings.APP_VERSION,
            "environment": settings.ENVIRONMENT,
        },
    )
    
    print(f"🚀 Starting {settings.APP_NAME} v{settings.APP_VERSION}")
    print(f"📝 Environment: {settings.ENVIRONMENT}")
    print(f"🔧 Debug mode: {settings.DEBUG}")
    
    # Check database connection
    db_connected = await check_database_connection()
    if db_connected:
        print("✅ Database connection successful")
        db_info = await get_database_info()
        print(f"📊 PostgreSQL version: {db_info.get('version', 'unknown')}")
    else:
        print("❌ Database connection failed!")
        print("   Make sure PostgreSQL is running")
    
    print(f"📚 API Docs: http://{settings.HOST}:{settings.PORT}/docs")
    
    yield
    
    # Shutdown
    logger.info("Application shutdown", extra={"app_name": settings.APP_NAME})
    print(f"👋 Shutting down {settings.APP_NAME}")
    
    # Close database connections
    await close_db()
    print("📊 Database connections closed")


def create_application() -> FastAPI:
    """
    Application factory pattern
    
    Creates and configures the FastAPI application
    """
    
    app = FastAPI(
        title=settings.APP_NAME,
        version=settings.APP_VERSION,
        description="""
        A production-ready FastAPI backend for AI applications.
        
        ## Features
        * **Database Integration** with PostgreSQL and SQLAlchemy
        * **Async Operations** for high performance
        * **Repository Pattern** for clean data access
        * **Database Migrations** with Alembic
        * **Advanced Validation** with Pydantic
        * **Comprehensive Error Handling**
        * **Structured Logging** with request tracking
        * **Middleware Stack** (logging, performance, CORS)
        * **User Management** with password hashing
        * **Conversation Management** for AI interactions
        
        ## Database
        * PostgreSQL 15
        * SQLAlchemy 2.0 (async)
        * Alembic migrations
        * Connection pooling
        
        ## Coming Soon
        * AI model integration (Ollama)
        * JWT Authentication
        * WebSocket support
        * Redis caching
        """,
        debug=settings.DEBUG,
        docs_url="/docs",
        redoc_url="/redoc",
        openapi_url="/openapi.json",
        lifespan=lifespan,
    )
    
    # ============================================
    # MIDDLEWARE
    # ============================================
    
    app.add_middleware(
        PerformanceMiddleware,
        slow_request_threshold=1.0,
    )
    
    app.add_middleware(LoggingMiddleware)
    
    app.add_middleware(
        CORSMiddleware,
        allow_origins=settings.allowed_origins_list,
        allow_credentials=True,
        allow_methods=["*"],
        allow_headers=["*"],
        expose_headers=["X-Request-ID", "X-Process-Time"],
    )
    
    # ============================================
    # EXCEPTION HANDLERS
    # ============================================
    
    app.add_exception_handler(AppException, app_exception_handler)
    app.add_exception_handler(RequestValidationError, validation_exception_handler)
    app.add_exception_handler(ValidationError, validation_exception_handler)
    app.add_exception_handler(Exception, generic_exception_handler)
    
    # ============================================
    # ROUTERS
    # ============================================
    
    app.include_router(api_router, prefix=settings.API_V1_PREFIX)
    
    return app


# Create the application instance
app = create_application()

Step 15: Create Management Scripts

Create manage.py in the fastapi directory:

"""
Database management script

Provides CLI commands for database operations
"""

import asyncio
import sys
from app.db.utils import (
    init_database,
    drop_database,
    reset_database,
    check_database_connection,
    get_database_info
)
from app.db.seed import seed_database


async def main():
    """Main CLI handler"""
    if len(sys.argv) < 2:
        print("""
Usage: python manage.py <command>

Commands:
  init       - Initialize database (create tables)
  drop       - Drop all tables (WARNING: deletes all data)
  reset      - Drop and recreate all tables (WARNING: deletes all data)
  seed       - Seed database with demo data
  check      - Check database connection
  info       - Show database information
        """)
        return
    
    command = sys.argv[1]
    
    if command == "init":
        await init_database()
    
    elif command == "drop":
        confirm = input("⚠️  This will delete ALL data. Continue? (yes/no): ")
        if confirm.lower() == "yes":
            await drop_database()
        else:
            print("Operation cancelled")
    
    elif command == "reset":
        confirm = input("⚠️  This will delete ALL data and recreate tables. Continue? (yes/no): ")
        if confirm.lower() == "yes":
            await reset_database()
        else:
            print("Operation cancelled")
    
    elif command == "seed":
        await seed_database()
    
    elif command == "check":
        connected = await check_database_connection()
        if connected:
            print("✅ Database connection successful")
        else:
            print("❌ Database connection failed")
    
    elif command == "info":
        info = await get_database_info()
        print("\n📊 Database Information:")
        for key, value in info.items():
            print(f"   {key}: {value}")
    
    else:
        print(f"Unknown command: {command}")
        print("Run 'python manage.py' for help")


if __name__ == "__main__":
    asyncio.run(main())

Make it executable:

chmod +x manage.py

Step 16: Testing Database Operations

Create test_database.py:

"""
Test script for database operations

Tests all database functionality
"""

import requests
import json
from typing import Dict, Any

BASE_URL = "http://127.0.0.1:8000/api/v1"


def print_test(title: str, response: requests.Response):
    """Print test results"""
    print(f"\n{'='*70}")
    print(f"{title}")
    print(f"{'='*70}")
    print(f"Status: {response.status_code}")
    try:
        data = response.json()
        print(f"Response:\n{json.dumps(data, indent=2, default=str)}")
    except:
        print(f"Response: {response.text[:500]}")


def test_create_user():
    """Test creating a user"""
    user_data = {
        "username": "test_user",
        "email": "test@example.com",
        "password": "SecurePass123",
        "full_name": "Test User",
        "role": "user"
    }
    
    response = requests.post(f"{BASE_URL}/users", json=user_data)
    print_test("✅ CREATE USER", response)
    
    if response.status_code == 201:
        return response.json()
    return None


def test_get_users():
    """Test getting all users"""
    response = requests.get(f"{BASE_URL}/users")
    print_test("📋 GET ALL USERS", response)
    return response.json() if response.status_code == 200 else []


def test_get_user(user_id: int):
    """Test getting specific user"""
    response = requests.get(f"{BASE_URL}/users/{user_id}")
    print_test(f"👤 GET USER {user_id}", response)


def test_update_user(user_id: int):
    """Test updating user"""
    update_data = {
        "full_name": "Updated Test User"
    }
    
    response = requests.patch(f"{BASE_URL}/users/{user_id}", json=update_data)
    print_test(f"✏️  UPDATE USER {user_id}", response)


def test_create_conversation(user_id: int):
    """Test creating conversation"""
    conv_data = {
        "user_id": user_id,
        "title": "Test Conversation",
        "model_name": "llama2"
    }
    
    response = requests.post(f"{BASE_URL}/conversations", json=conv_data)
    print_test("💬 CREATE CONVERSATION", response)
    
    if response.status_code == 201:
        return response.json()
    return None


def test_add_message(conversation_id: int):
    """Test adding message to conversation"""
    message_data = {
        "role": "user",
        "content": "Hello, this is a test message!",
        "total_tokens": 10
    }
    
    response = requests.post(
        f"{BASE_URL}/conversations/{conversation_id}/messages",
        json=message_data
    )
    print_test("📝 ADD MESSAGE", response)


def test_get_conversation(conversation_id: int):
    """Test getting conversation with messages"""
    response = requests.get(f"{BASE_URL}/conversations/{conversation_id}")
    print_test(f"💬 GET CONVERSATION {conversation_id}", response)


def test_get_user_conversations(user_id: int):
    """Test getting user's conversations"""
    response = requests.get(f"{BASE_URL}/conversations/user/{user_id}")
    print_test(f"📋 GET USER {user_id} CONVERSATIONS", response)


def test_delete_conversation(conversation_id: int):
    """Test deleting conversation"""
    response = requests.delete(f"{BASE_URL}/conversations/{conversation_id}")
    print_test(f"🗑️  DELETE CONVERSATION {conversation_id}", response)


def test_delete_user(user_id: int):
    """Test deleting user"""
    response = requests.delete(f"{BASE_URL}/users/{user_id}")
    print_test(f"🗑️  DELETE USER {user_id}", response)


def run_all_tests():
    """Run complete test suite"""
    print("\n" + "🧪"*35)
    print("DATABASE INTEGRATION TEST SUITE")
    print("🧪"*35)
    
    # Test user operations
    print("\n" + "="*70)
    print("USER OPERATIONS")
    print("="*70)
    
    created_user = test_create_user()
    if not created_user:
        print("❌ Failed to create user, stopping tests")
        return
    
    user_id = created_user['id']
    
    test_get_users()
    test_get_user(user_id)
    test_update_user(user_id)
    
    # Test conversation operations
    print("\n" + "="*70)
    print("CONVERSATION OPERATIONS")
    print("="*70)
    
    created_conv = test_create_conversation(user_id)
    if not created_conv:
        print("❌ Failed to create conversation")
        return
    
    conv_id = created_conv['id']
    
    test_add_message(conv_id)
    test_add_message(conv_id)  # Add second message
    test_get_conversation(conv_id)
    test_get_user_conversations(user_id)
    
    # Test cascade delete
    print("\n" + "="*70)
    print("CASCADE DELETE TEST")
    print("="*70)
    
    test_delete_user(user_id)  # Should also delete conversations
    
    print("\n" + "✅"*35)
    print("ALL TESTS COMPLETED!")
    print("✅"*35 + "\n")


if __name__ == "__main__":
    print("""
    ╔════════════════════════════════════════════════════════╗
    ║  Database Integration Test Suite                      ║
    ║                                                        ║
    ║  Prerequisites:                                        ║
    ║  1. PostgreSQL running (docker-compose up -d)         ║
    ║  2. Migrations applied (alembic upgrade head)         ║
    ║  3. FastAPI server running (python main.py)           ║
    ╚════════════════════════════════════════════════════════╝
    """)
    
    try:
        response = requests.get(f"{BASE_URL}/health")
        if response.status_code == 200:
            run_all_tests()
        else:
            print("❌ Server returned unexpected status")
    except requests.exceptions.ConnectionError:
        print("❌ ERROR: Cannot connect to server!")
        print("   Please start the server with: python main.py")
    except Exception as e:
        print(f"❌ ERROR: {e}")

Step 17: Complete Workflow & Testing

Complete Setup Workflow

Create a comprehensive setup guide SETUP_DATABASE.md:

# Database Setup Guide

## Prerequisites

- Python 3.10+
- PostgreSQL 15+ (or Docker)
- Virtual environment activated

## Step-by-Step Setup

### 1. Install Dependencies

```bash
cd fastapi
source venv/bin/activate  # Windows: venv\Scripts\activate

# Install all packages including database
pip install -r requirements.txt
```

### 2. Start PostgreSQL

**Option A: Docker (Recommended)**
```bash
# From project root
docker-compose up -d

# Verify it's running
docker ps
docker logs aiverse_postgres
```

**Option B: Local Installation**
```bash
# macOS
brew services start postgresql@15

# Linux
sudo systemctl start postgresql

# Windows
# Start from Services or pgAdmin
```

### 3. Configure Environment

```bash
# Copy example environment file
cp .env.example .env

# Update database URL in .env
# DATABASE_URL=postgresql+asyncpg://aiverse_user:aiverse_pass@localhost:5432/aiverse_db
```

### 4. Run Migrations

```bash
# Create initial migration (if not exists)
alembic revision --autogenerate -m "Initial migration"

# Apply migrations
alembic upgrade head

# Verify tables created
docker exec -it aiverse_postgres psql -U aiverse_user -d aiverse_db -c "\dt"
```

### 5. Seed Database (Optional)

```bash
# Seed with demo data
python manage.py seed

# Or run directly
python -m app.db.seed
```

### 6. Start Application

```bash
python main.py
```

### 7. Test Database

```bash
# Check database connection
python manage.py check

# Run database tests
python test_database.py
```

## Common Commands

### Database Management

```bash
# Initialize database
python manage.py init

# Reset database (WARNING: deletes all data)
python manage.py reset

# Seed with demo data
python manage.py seed

# Check connection
python manage.py check

# Show database info
python manage.py info
```

### Alembic Migrations

```bash
# Create new migration
alembic revision --autogenerate -m "Description"

# Apply migrations
alembic upgrade head

# Rollback last migration
alembic downgrade -1

# Show migration history
alembic history

# Show current version
alembic current
```

### Docker Commands

```bash
# Start PostgreSQL
docker-compose up -d

# Stop PostgreSQL
docker-compose down

# View logs
docker logs aiverse_postgres

# Access PostgreSQL shell
docker exec -it aiverse_postgres psql -U aiverse_user -d aiverse_db

# Backup database
docker exec aiverse_postgres pg_dump -U aiverse_user aiverse_db > backup.sql

# Restore database
docker exec -i aiverse_postgres psql -U aiverse_user aiverse_db < backup.sql
```

## Troubleshooting

### Connection Refused

```bash
# Check if PostgreSQL is running
docker ps

# Check logs
docker logs aiverse_postgres

# Restart container
docker-compose restart
```

### Migration Errors

```bash
# Rollback and retry
alembic downgrade -1
alembic upgrade head

# Force revision
alembic stamp head
```

### Password Issues

```bash
# Reset PostgreSQL password
docker exec -it aiverse_postgres psql -U postgres
ALTER USER aiverse_user WITH PASSWORD 'new_password';
```

## Production Deployment

### Environment Variables

```bash
# Use strong passwords
DATABASE_URL=postgresql+asyncpg://user:STRONG_PASSWORD@host:5432/db

# Disable echo in production
DATABASE_ECHO=False

# Adjust pool size based on load
DB_POOL_SIZE=20
DB_MAX_OVERFLOW=40
```

### Security Checklist

- [ ] Use strong database passwords
- [ ] Enable SSL/TLS for database connections
- [ ] Restrict database access to application servers only
- [ ] Regular backups scheduled
- [ ] Monitor connection pool usage
- [ ] Set up database monitoring
- [ ] Configure proper firewall rules

### Performance Optimization

```bash
# Increase pool size for high traffic
DB_POOL_SIZE=50
DB_MAX_OVERFLOW=100

# Enable connection recycling
DB_POOL_RECYCLE=3600

# Use read replicas for scaling
# Add read replica URL to settings
```

Step 18: Update Requirements.txt

Update requirements.txt with all database dependencies:

# FastAPI and core dependencies
fastapi==0.115.0
uvicorn[standard]==0.32.1
pydantic==2.10.3
pydantic-settings==2.7.0
pydantic[email]==2.10.3

# Email validation
email-validator==2.3.0
dnspython==2.8.0

# Phone number validation
phonenumbers==9.0.28

# Environment configuration
python-dotenv==1.2.2

# Structured logging
python-json-logger==4.1.0

# HTTP client
requests==2.33.1

# Database - SQLAlchemy
sqlalchemy[asyncio]==2.0.23
asyncpg==0.29.0
psycopg2-binary==2.9.9

# Database migrations
alembic==1.13.1

# Password hashing
passlib[bcrypt]==1.7.4

# ASGI server
starlette==0.45.0

Step 19: Create Comprehensive Testing Script

Create test_complete_database.py:

"""
Comprehensive database integration test

Tests all database functionality end-to-end
"""

import asyncio
from sqlalchemy.ext.asyncio import AsyncSession

from app.db.session import async_session_maker
from app.db.repositories.user_repository import UserRepository
from app.db.repositories.conversation_repository import ConversationRepository
from app.db.repositories.message_repository import MessageRepository
from app.db.models.user import UserRole
from app.db.models.message import MessageRole
from passlib.context import CryptContext

pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")


async def test_user_operations():
    """Test user CRUD operations"""
    print("\n" + "="*70)
    print("TESTING USER OPERATIONS")
    print("="*70)
    
    async with async_session_maker() as session:
        repo = UserRepository(session)
        
        # Create user
        print("\n1. Creating user...")
        user = await repo.create(
            username="test_db_user",
            email="testdb@example.com",
            hashed_password=pwd_context.hash("password123"),
            full_name="Test DB User",
            role=UserRole.USER,
            is_active=True
        )
        print(f"   ✅ Created: {user}")
        
        # Get user by ID
        print("\n2. Getting user by ID...")
        found_user = await repo.get_by_id(user.id)
        print(f"   ✅ Found: {found_user}")
        
        # Get user by username
        print("\n3. Getting user by username...")
        username_user = await repo.get_by_username("test_db_user")
        print(f"   ✅ Found: {username_user}")
        
        # Update user
        print("\n4. Updating user...")
        updated = await repo.update(user.id, full_name="Updated Name")
        print(f"   ✅ Updated: {updated}")
        
        # Count users
        print("\n5. Counting users...")
        count = await repo.count()
        print(f"   ✅ Total users: {count}")
        
        await session.commit()
        return user


async def test_conversation_operations(user_id: int):
    """Test conversation CRUD operations"""
    print("\n" + "="*70)
    print("TESTING CONVERSATION OPERATIONS")
    print("="*70)
    
    async with async_session_maker() as session:
        repo = ConversationRepository(session)
        
        # Create conversation
        print("\n1. Creating conversation...")
        conv = await repo.create(
            user_id=user_id,
            title="Test Conversation",
            model_name="llama2"
        )
        print(f"   ✅ Created: {conv}")
        
        # Get user conversations
        print("\n2. Getting user conversations...")
        user_convs = await repo.get_by_user(user_id)
        print(f"   ✅ Found {len(user_convs)} conversations")
        
        # Count conversations
        print("\n3. Counting user conversations...")
        count = await repo.get_user_conversation_count(user_id)
        print(f"   ✅ User has {count} conversations")
        
        await session.commit()
        return conv


async def test_message_operations(conversation_id: int):
    """Test message CRUD operations"""
    print("\n" + "="*70)
    print("TESTING MESSAGE OPERATIONS")
    print("="*70)
    
    async with async_session_maker() as session:
        repo = MessageRepository(session)
        
        # Create messages
        print("\n1. Creating messages...")
        msg1 = await repo.create(
            conversation_id=conversation_id,
            role=MessageRole.USER,
            content="Hello, how are you?",
            total_tokens=5
        )
        print(f"   ✅ Created message 1: {msg1}")
        
        msg2 = await repo.create(
            conversation_id=conversation_id,
            role=MessageRole.ASSISTANT,
            content="I'm doing great, thank you!",
            total_tokens=7
        )
        print(f"   ✅ Created message 2: {msg2}")
        
        # Get conversation messages
        print("\n2. Getting conversation messages...")
        messages = await repo.get_by_conversation(conversation_id)
        print(f"   ✅ Found {len(messages)} messages")
        for msg in messages:
            print(f"      - [{msg.role}]: {msg.content[:50]}...")
        
        # Get last message
        print("\n3. Getting last message...")
        last = await repo.get_last_message(conversation_id)
        print(f"   ✅ Last message: {last.content}")
        
        # Count by role
        print("\n4. Counting messages by role...")
        user_count = await repo.count_by_role(conversation_id, MessageRole.USER)
        ai_count = await repo.count_by_role(conversation_id, MessageRole.ASSISTANT)
        print(f"   ✅ User messages: {user_count}")
        print(f"   ✅ Assistant messages: {ai_count}")
        
        await session.commit()


async def test_cascade_delete(user_id: int):
    """Test cascade delete (deleting user deletes conversations and messages)"""
    print("\n" + "="*70)
    print("TESTING CASCADE DELETE")
    print("="*70)
    
    async with async_session_maker() as session:
        user_repo = UserRepository(session)
        conv_repo = ConversationRepository(session)
        msg_repo = MessageRepository(session)
        
        # Count before delete
        print("\n1. Counting before delete...")
        user_convs_before = await conv_repo.get_user_conversation_count(user_id)
        print(f"   Conversations before: {user_convs_before}")
        
        # Delete user
        print("\n2. Deleting user...")
        deleted = await user_repo.delete(user_id)
        print(f"   ✅ User deleted: {deleted}")
        
        await session.commit()
        
        # Verify conversations deleted
        print("\n3. Verifying cascade delete...")
        user_convs_after = await conv_repo.get_user_conversation_count(user_id)
        print(f"   Conversations after: {user_convs_after}")
        print(f"   ✅ Cascade delete successful!")


async def test_relationships():
    """Test SQLAlchemy relationships"""
    print("\n" + "="*70)
    print("TESTING RELATIONSHIPS")
    print("="*70)
    
    async with async_session_maker() as session:
        conv_repo = ConversationRepository(session)
        user_repo = UserRepository(session)
        
        # Create user
        print("\n1. Creating user with conversations...")
        user = await user_repo.create(
            username="relationship_test",
            email="relationship@example.com",
            hashed_password=pwd_context.hash("password123"),
            role=UserRole.USER,
            is_active=True
        )
        
        # Create conversations
        conv1 = await conv_repo.create(
            user_id=user.id,
            title="First Conversation",
            model_name="llama2"
        )
        conv2 = await conv_repo.create(
            user_id=user.id,
            title="Second Conversation",
            model_name="mistral"
        )
        
        await session.commit()
        
        # Test eager loading
        print("\n2. Testing eager loading...")
        loaded_user = await user_repo.get_by_id(user.id)
        await session.refresh(loaded_user, ["conversations"])
        print(f"   ✅ User has {len(loaded_user.conversations)} conversations")
        
        # Test conversation with messages
        print("\n3. Testing conversation with messages...")
        conv_with_msgs = await conv_repo.get_with_messages(conv1.id)
        print(f"   ✅ Conversation loaded with {len(conv_with_msgs.messages)} messages")
        
        # Cleanup
        await user_repo.delete(user.id)
        await session.commit()


async def run_all_tests():
    """Run all database tests"""
    print("\n" + "🧪"*35)
    print("COMPREHENSIVE DATABASE TEST SUITE")
    print("🧪"*35)
    
    try:
        # Test user operations
        user = await test_user_operations()
        
        # Test conversation operations
        conv = await test_conversation_operations(user.id)
        
        # Test message operations
        await test_message_operations(conv.id)
        
        # Test relationships
        await test_relationships()
        
        # Test cascade delete (cleanup)
        await test_cascade_delete(user.id)
        
        print("\n" + "✅"*35)
        print("ALL DATABASE TESTS PASSED!")
        print("✅"*35 + "\n")
        
    except Exception as e:
        print(f"\n❌ TEST FAILED: {e}")
        import traceback
        traceback.print_exc()


if __name__ == "__main__":
    print("""
    ╔════════════════════════════════════════════════════════╗
    ║  Comprehensive Database Test Suite                    ║
    ║                                                        ║
    ║  This tests:                                           ║
    ║  - User CRUD operations                                ║
    ║  - Conversation CRUD operations                        ║
    ║  - Message CRUD operations                             ║
    ║  - SQLAlchemy relationships                            ║
    ║  - Cascade deletes                                     ║
    ║                                                        ║
    ║  Prerequisites:                                        ║
    ║  1. PostgreSQL running                                 ║
    ║  2. Migrations applied                                 ║
    ╚════════════════════════════════════════════════════════╝
    """)
    
    asyncio.run(run_all_tests())

Step 20: Create Production Deployment Guide

Create DEPLOYMENT.md:

# Database Deployment Guide

## Production PostgreSQL Setup

### 1. Managed Database Services (Recommended)

**AWS RDS:**
```bash
# Create RDS PostgreSQL instance
aws rds create-db-instance \
    --db-instance-identifier aiverse-prod \
    --db-instance-class db.t3.medium \
    --engine postgres \
    --engine-version 15.4 \
    --master-username admin \
    --master-user-password  \
    --allocated-storage 100 \
    --storage-encrypted \
    --backup-retention-period 7

# Update .env with RDS endpoint
DATABASE_URL=postgresql+asyncpg://admin:password@aiverse-prod.xxx.rds.amazonaws.com:5432/aiverse
```

**Google Cloud SQL:**
```bash
# Create Cloud SQL instance
gcloud sql instances create aiverse-prod \
    --database-version=POSTGRES_15 \
    --tier=db-n1-standard-2 \
    --region=us-central1

# Update connection string
DATABASE_URL=postgresql+asyncpg://user:pass@:5432/aiverse
```

**DigitalOcean Managed Database:**
```bash
# Create via web interface or API
# Connection string format:
DATABASE_URL=postgresql+asyncpg://user:pass@db-postgresql-xxx.ondigitalocean.com:25060/aiverse?sslmode=require
```

### 2. SSL/TLS Configuration

**Enable SSL in production:**

```python
# app/core/config.py
DATABASE_URL: str = "postgresql+asyncpg://user:pass@host:5432/db?ssl=require"

# For custom SSL certificates
from sqlalchemy.engine.url import make_url
from ssl import create_default_context

url = make_url(settings.DATABASE_URL)
ssl_context = create_default_context()
ssl_context.load_cert_chain('/path/to/cert.pem', '/path/to/key.pem')

engine = create_async_engine(
    url,
    connect_args={"ssl": ssl_context}
)
```

### 3. Connection Pool Optimization

**High Traffic Settings:**

```bash
# .env.production
DB_POOL_SIZE=50
DB_MAX_OVERFLOW=100
DB_POOL_TIMEOUT=30
DB_POOL_RECYCLE=3600
```

**Connection Pool Monitoring:**

```python
# Add to app startup
from app.db.session import engine

@app.on_event("startup")
async def monitor_pool():
    pool = engine.pool
    logger.info(f"Pool size: {pool.size()}")
    logger.info(f"Checked out: {pool.checkedout()}")
```

### 4. Database Migrations in Production

**Automated Migration Workflow:**

```bash
# 1. Backup database before migration
pg_dump -h host -U user dbname > backup_$(date +%Y%m%d).sql

# 2. Run migrations
alembic upgrade head

# 3. Verify migration
alembic current

# 4. If issues, rollback
alembic downgrade -1
```

**CI/CD Integration:**

```yaml
# .github/workflows/deploy.yml
- name: Run Database Migrations
  run: |
    alembic upgrade head
  env:
    DATABASE_URL: ${{ secrets.DATABASE_URL }}
```

### 5. Backup Strategy

**Automated Backups:**

```bash
# Daily backup script
#!/bin/bash
BACKUP_DIR="/backups"
DATE=$(date +%Y%m%d_%H%M%S)
DB_NAME="aiverse_db"

pg_dump -h localhost -U aiverse_user $DB_NAME | \
    gzip > $BACKUP_DIR/backup_$DATE.sql.gz

# Keep last 7 days
find $BACKUP_DIR -name "backup_*.sql.gz" -mtime +7 -delete
```

**Cron Schedule:**

```bash
# Add to crontab
0 2 * * * /path/to/backup.sh
```

### 6. Monitoring & Alerts

**Query Performance Monitoring:**

```python
# Enable query logging
import logging
logging.getLogger('sqlalchemy.engine').setLevel(logging.INFO)
```

**Database Health Checks:**

```python
# app/api/v1/endpoints/health.py
@router.get("/health/database")
async def database_health():
    from app.db.utils import check_database_connection, get_database_info
    
    is_connected = await check_database_connection()
    info = await get_database_info()
    
    return {
        "status": "healthy" if is_connected else "unhealthy",
        "database": info
    }
```

### 7. Security Best Practices

**Environment Variables:**

```bash
# Never commit these to git
DATABASE_URL=postgresql+asyncpg://user:STRONG_RANDOM_PASSWORD@host:5432/db
SECRET_KEY=
```

**Network Security:**

```bash
# PostgreSQL pg_hba.conf
# Only allow app servers
host    aiverse_db    aiverse_user    10.0.1.0/24    scram-sha-256

# Firewall rules
# Only allow port 5432 from application servers
```

**Role-Based Access:**

```sql
-- Create read-only user for analytics
CREATE ROLE analytics_user WITH LOGIN PASSWORD 'password';
GRANT CONNECT ON DATABASE aiverse_db TO analytics_user;
GRANT SELECT ON ALL TABLES IN SCHEMA public TO analytics_user;
```

### 8. Performance Tuning

**PostgreSQL Configuration:**

```conf
# postgresql.conf
shared_buffers = 256MB
effective_cache_size = 1GB
maintenance_work_mem = 64MB
checkpoint_completion_target = 0.9
wal_buffers = 16MB
default_statistics_target = 100
random_page_cost = 1.1
effective_io_concurrency = 200
work_mem = 4MB
min_wal_size = 1GB
max_wal_size = 4GB
```

**Index Optimization:**

```sql
-- Create indexes for common queries
CREATE INDEX CONCURRENTLY idx_users_email ON users(email);
CREATE INDEX CONCURRENTLY idx_conversations_user_updated 
    ON conversations(user_id, updated_at DESC);
CREATE INDEX CONCURRENTLY idx_messages_conversation_created 
    ON messages(conversation_id, created_at);

-- Analyze tables
ANALYZE users;
ANALYZE conversations;
ANALYZE messages;
```

### 9. Disaster Recovery

**Point-in-Time Recovery:**

```bash
# Enable WAL archiving
# postgresql.conf
archive_mode = on
archive_command = 'cp %p /archive/%f'

# Restore to specific point
pg_restore --dbname=aiverse_db --clean backup.sql
```

**High Availability Setup:**

```bash
# Primary-Replica configuration
# Primary: postgresql.conf
wal_level = replica
max_wal_senders = 3
wal_keep_size = 1GB

# Replica: recovery.conf
primary_conninfo = 'host=primary port=5432 user=replicator'
```

### 10. Migration Checklist

Production deployment checklist:

- [ ] Database backups scheduled
- [ ] SSL/TLS enabled
- [ ] Connection pool configured
- [ ] Monitoring set up
- [ ] Health checks implemented
- [ ] Migration scripts tested
- [ ] Rollback plan documented
- [ ] Access controls configured
- [ ] Performance indexes created
- [ ] Disaster recovery plan in place

📚 FAQ Section

Why use PostgreSQL instead of MySQL or SQLite?

PostgreSQL offers superior features for production applications including ACID compliance, advanced data types (JSON, arrays), full-text search, concurrent write handling, and excellent performance with complex queries. SQLite is great for development but not production-ready for multi-user applications. MySQL is good, but PostgreSQL has better standards compliance and more advanced features like CTEs, window functions, and better JSON support.
The Repository Pattern is a design pattern that creates an abstraction layer between your business logic and data access. It provides centralized data access logic, making your code more testable (you can mock repositories), maintainable (database changes don’t affect business logic), and reusable. Instead of writing SQLAlchemy queries directly in your service layer, you call repository methods like repo.create() or repo.get_by_id(), which can be easily tested or swapped with different implementations.
Use backwards-compatible migrations: (1) Add new columns as nullable first, populate them, then make them required. (2) Never drop columns immediately—deprecate them first. (3) Use CREATE INDEX CONCURRENTLY to avoid locking tables. (4) Test migrations on staging with production-like data. (5) Always have a rollback plan with alembic downgrade. (6) For large tables, batch updates instead of one big transaction. (7) Use blue-green deployment or rolling updates so old code works during migration.
Async SQLAlchemy uses asyncpg and async/await for non-blocking database operations, allowing your application to handle thousands of concurrent requests efficiently. Use async when building high-concurrency applications like chat systems, real-time APIs, or when you have many I/O-bound operations. Use sync (traditional SQLAlchemy) for batch jobs, admin scripts, or when you’re working with synchronous libraries. For FastAPI, async is recommended because FastAPI is async-first.
Key optimizations include: (1) Create indexes on frequently queried columns (user_id, email, created_at). (2) Use connection pooling (we configured this with DB_POOL_SIZE). (3) Use select(Model).options(selectinload()) to avoid N+1 queries. (4) Paginate large result sets with LIMIT and OFFSET. (5) Monitor slow queries with logging and use EXPLAIN ANALYZE. (6) Cache frequently accessed data with Redis. (7) Use database read replicas for scaling reads. (8) Regularly run VACUUM and ANALYZE on PostgreSQL.

🎯 Summary

Congratulations! You’ve successfully integrated PostgreSQL with your FastAPI application. Here’s what you’ve accomplished:

✅ What You Built:

  • Complete database integration with PostgreSQL
  • SQLAlchemy ORM models with relationships
  • Repository pattern for clean data access
  • Alembic migrations for schema management
  • Async database operations for performance
  • Connection pooling for scalability
  • User and conversation management
  • Password hashing for security

✅ Production-Ready Features:

  • Proper error handling
  • Database health checks
  • Migration management
  • Seed data scripts
  • Comprehensive testing
  • Security best practices

📊 Architecture Achievement:

FastAPI Endpoints
    ↓
Services (Business Logic)
    ↓
Repositories (Data Access)
    ↓
SQLAlchemy Models
    ↓
PostgreSQL Database

Ready for Blog Post 9: JWT Authentication & Authorization! 🚀

Leave a Reply

Your email address will not be published. Required fields are marked *