Ep.09 FastAPI JWT Authentication: Complete Security Guide

Views: 1

To implement JWT authentication in FastAPI, install python-jose for token generation, create utility functions to encode/decode JWT tokens with expiration, implement OAuth2PasswordBearer for token extraction, create a login endpoint that validates credentials and returns access tokens, and use dependency injection to protect routes by verifying tokens and extracting user information. Add refresh tokens for better security, implement role-based access control with custom dependencies, and store tokens securely on the client side. This architecture provides stateless, scalable authentication suitable for modern web applications and mobile apps.

🎓 What You’ll Learn

By the end of this tutorial, you’ll be able to:

  • Implement JWT token generation and validation
  • Create secure login and registration endpoints
  • Protect routes with authentication dependencies
  • Implement refresh token rotation
  • Add role-based access control (RBAC)
  • Handle token expiration and blacklisting
  • Secure password reset flows
  • Implement email verification
  • Add OAuth2 password flow
  • Deploy authentication in production

📖 Understanding JWT Authentication

What is JWT?

JWT (JSON Web Token) is a compact, URL-safe token format for securely transmitting information between parties.

Structure:

header.payload.signature
eyJhbGc.eyJzdWI.SflKxwRJ

Parts:

  1. Header: Algorithm and token type
  2. Payload: Claims (user data)
  3. Signature: Verification hash

Why JWT?

FeatureJWTSession Cookies
Stateless✅ Yes❌ No (needs DB)
Scalable✅ Easy⚠️ Requires sticky sessions
Mobile-Friendly✅ Perfect⚠️ Complex
Microservices✅ Ideal❌ Difficult
Revocation⚠️ Needs blacklist✅ Easy

Authentication Flow

1. User → POST /login {username, password}
2. Server → Validates credentials
3. Server → Generates JWT token
4. Server → Returns {access_token, refresh_token}
5. Client → Stores tokens
6. Client → Sends Authorization: Bearer <token>
7. Server → Validates token
8. Server → Returns protected resource

🛠️ Step-by-Step Implementation

Step 1: Install Dependencies

# Activate virtual environment
cd fastapi
source venv/bin/activate  # Windows: venv\Scripts\activate

# Install JWT and password hashing
pip install python-jose[cryptography]==3.3.0
pip install pwdlib==0.2.1

# Update requirements
pip freeze > requirements.txt

What we installed:

  • python-jose: JWT token generation/validation
  • pwdlib: Modern password hashing (replaces passlib)

Step 2: Update Configuration

Update app/core/config.py:

"""
Core configuration module
Manages all application settings using Pydantic Settings
"""

from pydantic_settings import BaseSettings, SettingsConfigDict
from typing import List
from functools import lru_cache


class Settings(BaseSettings):
    """
    Application settings
    
    These values are loaded from environment variables or .env file
    Pydantic validates the types automatically
    """
    
    # Application
    APP_NAME: str = "AIVerse Backend"
    APP_VERSION: str = "0.5.0"
    DEBUG: bool = False
    ENVIRONMENT: str = "production"
    
    # API
    API_V1_PREFIX: str = "/api/v1"
    HOST: str = "0.0.0.0"
    PORT: int = 8000
    
    # CORS
    ALLOWED_ORIGINS: str = "http://localhost:3000"
    
    @property
    def allowed_origins_list(self) -> List[str]:
        """Convert comma-separated string to list"""
        return [origin.strip() for origin in self.ALLOWED_ORIGINS.split(",")]
    
    # Database Configuration
    DATABASE_URL: str = "postgresql+asyncpg://aiverse_user:aiverse_pass@localhost:5432/aiverse_db"
    DATABASE_ECHO: bool = False
    DB_POOL_SIZE: int = 5
    DB_MAX_OVERFLOW: int = 10
    DB_POOL_TIMEOUT: int = 30
    DB_POOL_RECYCLE: int = 3600
    
    @property
    def async_database_url(self) -> str:
        """Get async database URL"""
        return self.DATABASE_URL
    
    @property
    def sync_database_url(self) -> str:
        """Get sync database URL (for Alembic migrations)"""
        return self.DATABASE_URL.replace("+asyncpg", "").replace("postgresql+asyncpg", "postgresql")
    
    # AI Settings
    OLLAMA_BASE_URL: str = "http://localhost:11434"
    DEFAULT_AI_MODEL: str = "llama2"
    
    # JWT Authentication Settings
    SECRET_KEY: str = "change-this-to-a-random-secret-key-in-production"
    ALGORITHM: str = "HS256"
    ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
    REFRESH_TOKEN_EXPIRE_DAYS: int = 7
    
    # Password Reset Settings
    PASSWORD_RESET_TOKEN_EXPIRE_HOURS: int = 1
    EMAIL_VERIFICATION_TOKEN_EXPIRE_HOURS: int = 24
    
    # Email Settings (for future use)
    SMTP_HOST: str = "smtp.gmail.com"
    SMTP_PORT: int = 587
    SMTP_USER: str = ""
    SMTP_PASSWORD: str = ""
    EMAILS_FROM_EMAIL: str = "noreply@aiverse.com"
    EMAILS_FROM_NAME: str = "AIVerse"
    
    # Pydantic Settings Configuration
    model_config = SettingsConfigDict(
        env_file=".env",
        env_file_encoding="utf-8",
        case_sensitive=True,
        extra="ignore"
    )


@lru_cache()
def get_settings() -> Settings:
    """
    Create settings instance (cached)
    
    @lru_cache ensures we only create one Settings instance
    and reuse it throughout the application
    
    Returns:
        Settings: Application settings
    """
    return Settings()


# Convenience: Get settings instance
settings = get_settings()

Update .env:

# Application Configuration
APP_NAME=AIVerse Backend
APP_VERSION=0.5.0
DEBUG=True
ENVIRONMENT=development

# Server Configuration
API_V1_PREFIX=/api/v1
HOST=0.0.0.0
PORT=8000

# CORS Configuration
ALLOWED_ORIGINS=http://localhost:3000,http://localhost:8000

# Database Configuration
DATABASE_URL=postgresql+asyncpg://aiverse_user:aiverse_pass@localhost:5432/aiverse_db
DATABASE_ECHO=False
DB_POOL_SIZE=5
DB_MAX_OVERFLOW=10

# AI Configuration
OLLAMA_BASE_URL=http://localhost:11434
DEFAULT_AI_MODEL=llama2

# JWT Security Configuration
# Generate with: openssl rand -hex 32
SECRET_KEY=09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7
ALGORITHM=HS256
ACCESS_TOKEN_EXPIRE_MINUTES=30
REFRESH_TOKEN_EXPIRE_DAYS=7

# Password Reset
PASSWORD_RESET_TOKEN_EXPIRE_HOURS=1
EMAIL_VERIFICATION_TOKEN_EXPIRE_HOURS=24

# Email Configuration (optional)
SMTP_HOST=smtp.gmail.com
SMTP_PORT=587
SMTP_USER=
SMTP_PASSWORD=
EMAILS_FROM_EMAIL=noreply@aiverse.com
EMAILS_FROM_NAME=AIVerse

Step 3: Create Security Utilities

Create app/core/security.py:

"""
Security utilities

JWT token generation, password hashing, and authentication helpers
"""

from datetime import datetime, timedelta
from typing import Optional, Dict, Any
from jose import JWTError, jwt
from pwdlib import PasswordHash

from app.core.config import settings

# Password hashing using pwdlib (modern bcrypt)
password_hash = PasswordHash.recommended()


def hash_password(password: str) -> str:
    """
    Hash password using bcrypt
    
    Args:
        password: Plain text password
    
    Returns:
        Hashed password
    """
    return password_hash.hash(password)


def verify_password(plain_password: str, hashed_password: str) -> bool:
    """
    Verify password against hash
    
    Args:
        plain_password: Plain text password
        hashed_password: Hashed password
    
    Returns:
        True if password matches, False otherwise
    """
    return password_hash.verify(plain_password, hashed_password)


def create_access_token(
    data: Dict[str, Any],
    expires_delta: Optional[timedelta] = None
) -> str:
    """
    Create JWT access token
    
    Args:
        data: Data to encode in token (usually {"sub": user_id})
        expires_delta: Token expiration time
    
    Returns:
        Encoded JWT token
    """
    to_encode = data.copy()
    
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(
            minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES
        )
    
    to_encode.update({
        "exp": expire,
        "iat": datetime.utcnow(),
        "type": "access"
    })
    
    encoded_jwt = jwt.encode(
        to_encode,
        settings.SECRET_KEY,
        algorithm=settings.ALGORITHM
    )
    
    return encoded_jwt


def create_refresh_token(
    data: Dict[str, Any],
    expires_delta: Optional[timedelta] = None
) -> str:
    """
    Create JWT refresh token
    
    Args:
        data: Data to encode in token
        expires_delta: Token expiration time
    
    Returns:
        Encoded JWT refresh token
    """
    to_encode = data.copy()
    
    if expires_delta:
        expire = datetime.utcnow() + expires_delta
    else:
        expire = datetime.utcnow() + timedelta(
            days=settings.REFRESH_TOKEN_EXPIRE_DAYS
        )
    
    to_encode.update({
        "exp": expire,
        "iat": datetime.utcnow(),
        "type": "refresh"
    })
    
    encoded_jwt = jwt.encode(
        to_encode,
        settings.SECRET_KEY,
        algorithm=settings.ALGORITHM
    )
    
    return encoded_jwt


def decode_token(token: str) -> Optional[Dict[str, Any]]:
    """
    Decode and validate JWT token
    
    Args:
        token: JWT token to decode
    
    Returns:
        Decoded token payload if valid, None otherwise
    """
    try:
        payload = jwt.decode(
            token,
            settings.SECRET_KEY,
            algorithms=[settings.ALGORITHM]
        )
        return payload
    except JWTError:
        return None


def create_password_reset_token(email: str) -> str:
    """
    Create password reset token
    
    Args:
        email: User email
    
    Returns:
        Password reset token
    """
    delta = timedelta(hours=settings.PASSWORD_RESET_TOKEN_EXPIRE_HOURS)
    return create_access_token(
        data={"sub": email, "type": "password_reset"},
        expires_delta=delta
    )


def create_email_verification_token(email: str) -> str:
    """
    Create email verification token
    
    Args:
        email: User email
    
    Returns:
        Email verification token
    """
    delta = timedelta(hours=settings.EMAIL_VERIFICATION_TOKEN_EXPIRE_HOURS)
    return create_access_token(
        data={"sub": email, "type": "email_verification"},
        expires_delta=delta
    )


def verify_token_type(payload: Dict[str, Any], expected_type: str) -> bool:
    """
    Verify token type
    
    Args:
        payload: Decoded token payload
        expected_type: Expected token type
    
    Returns:
        True if token type matches, False otherwise
    """
    return payload.get("type") == expected_type

Step 4: Create Authentication Models

Create app/models/auth.py:

"""
Authentication models

Pydantic models for authentication requests and responses
"""

from pydantic import BaseModel, EmailStr, Field
from typing import Optional


class Token(BaseModel):
    """
    Token response model
    
    Returned after successful authentication
    """
    access_token: str = Field(..., description="JWT access token")
    refresh_token: str = Field(..., description="JWT refresh token")
    token_type: str = Field(default="bearer", description="Token type")
    expires_in: int = Field(..., description="Token expiration in seconds")
    
    model_config = {
        "json_schema_extra": {
            "example": {
                "access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
                "refresh_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
                "token_type": "bearer",
                "expires_in": 1800
            }
        }
    }


class TokenPayload(BaseModel):
    """
    Token payload model
    
    Represents decoded JWT token data
    """
    sub: str | None = None  # Subject (user ID or email)
    exp: int | None = None  # Expiration time
    iat: int | None = None  # Issued at
    type: str | None = None  # Token type


class LoginRequest(BaseModel):
    """
    Login request model
    """
    username: str = Field(..., min_length=3, description="Username or email")
    password: str = Field(..., min_length=8, description="Password")
    
    model_config = {
        "json_schema_extra": {
            "example": {
                "username": "john_doe",
                "password": "SecurePass123"
            }
        }
    }


class RefreshTokenRequest(BaseModel):
    """
    Refresh token request model
    """
    refresh_token: str = Field(..., description="Refresh token")
    
    model_config = {
        "json_schema_extra": {
            "example": {
                "refresh_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
            }
        }
    }


class PasswordResetRequest(BaseModel):
    """
    Password reset request model
    """
    email: EmailStr = Field(..., description="User email")
    
    model_config = {
        "json_schema_extra": {
            "example": {
                "email": "user@example.com"
            }
        }
    }


class PasswordResetConfirm(BaseModel):
    """
    Password reset confirmation model
    """
    token: str = Field(..., description="Reset token")
    new_password: str = Field(..., min_length=8, description="New password")
    
    model_config = {
        "json_schema_extra": {
            "example": {
                "token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
                "new_password": "NewSecurePass123"
            }
        }
    }


class EmailVerificationRequest(BaseModel):
    """
    Email verification request model
    """
    token: str = Field(..., description="Verification token")
    
    model_config = {
        "json_schema_extra": {
            "example": {
                "token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
            }
        }
    }


class ChangePasswordRequest(BaseModel):
    """
    Change password request model
    """
    current_password: str = Field(..., description="Current password")
    new_password: str = Field(..., min_length=8, description="New password")
    
    model_config = {
        "json_schema_extra": {
            "example": {
                "current_password": "OldPass123",
                "new_password": "NewSecurePass123"
            }
        }
    }

Step 5: Create Authentication Dependencies

Create app/core/auth_dependencies.py:

"""
Authentication dependencies

FastAPI dependencies for protecting routes
"""

from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.ext.asyncio import AsyncSession
from typing import Annotated, Optional

from app.core.security import decode_token, verify_token_type
from app.db.session import get_db
from app.db.repositories.user_repository import UserRepository
from app.db.models.user import User, UserRole
from app.models.auth import TokenPayload

# OAuth2 scheme for token extraction
oauth2_scheme = OAuth2PasswordBearer(tokenUrl=f"/api/v1/auth/login")
http_bearer = HTTPBearer()


async def get_current_user(
    token: Annotated[str, Depends(oauth2_scheme)],
    db: AsyncSession = Depends(get_db)
) -> User:
    """
    Get current authenticated user from token
    
    Args:
        token: JWT access token
        db: Database session
    
    Returns:
        Current user
    
    Raises:
        HTTPException: If token is invalid or user not found
    """
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    
    # Decode token
    payload = decode_token(token)
    if payload is None:
        raise credentials_exception
    
    # Verify token type
    if not verify_token_type(payload, "access"):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid token type",
            headers={"WWW-Authenticate": "Bearer"},
        )
    
    # Extract user ID
    user_id: str = payload.get("sub")
    if user_id is None:
        raise credentials_exception
    
    # Get user from database
    user_repo = UserRepository(db)
    try:
        user = await user_repo.get_by_id(int(user_id))
    except ValueError:
        # If user_id is email instead of ID
        user = await user_repo.get_by_email(user_id)
    
    if user is None:
        raise credentials_exception
    
    # Check if user is active
    if not user.is_active:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Inactive user"
        )
    
    return user


async def get_current_active_user(
    current_user: Annotated[User, Depends(get_current_user)]
) -> User:
    """
    Get current active user
    
    Args:
        current_user: Current user from token
    
    Returns:
        Current user if active
    
    Raises:
        HTTPException: If user is inactive
    """
    if not current_user.is_active:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Inactive user"
        )
    return current_user


async def get_current_verified_user(
    current_user: Annotated[User, Depends(get_current_active_user)]
) -> User:
    """
    Get current verified user
    
    Args:
        current_user: Current active user
    
    Returns:
        Current user if verified
    
    Raises:
        HTTPException: If user email is not verified
    """
    if not current_user.is_verified:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Email not verified"
        )
    return current_user


class RoleChecker:
    """
    Role-based access control dependency
    
    Usage:
        @router.get("/admin")
        async def admin_only(user: User = Depends(RoleChecker([UserRole.ADMIN]))):
            return {"message": "Admin access granted"}
    """
    
    def __init__(self, allowed_roles: list[UserRole]):
        self.allowed_roles = allowed_roles
    
    async def __call__(
        self,
        current_user: Annotated[User, Depends(get_current_active_user)]
    ) -> User:
        """
        Check if user has required role
        
        Args:
            current_user: Current authenticated user
        
        Returns:
            Current user if role is allowed
        
        Raises:
            HTTPException: If user doesn't have required role
        """
        if current_user.role not in self.allowed_roles:
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail=f"Permission denied. Required roles: {[r.value for r in self.allowed_roles]}"
            )
        return current_user


# Convenience role checkers
require_admin = RoleChecker([UserRole.ADMIN])
require_user = RoleChecker([UserRole.USER, UserRole.ADMIN])

Step 6: Update User Service

Update app/services/user_service.py to use the new security module:

"""
User service - Business logic for user operations

Updated with new security module
"""

from typing import Optional
from sqlalchemy.ext.asyncio import AsyncSession

from app.db.repositories.user_repository import UserRepository
from app.db.models.user import User, UserRole
from app.models.user import UserCreate, UserUpdate
from app.core.exceptions import UserNotFoundException, UserAlreadyExistsException
from app.core.security import hash_password, verify_password  # Updated import


class UserService:
    """
    User service class
    
    Handles all user-related business logic with database operations
    """
    
    def __init__(self, db: AsyncSession):
        """
        Initialize service with database session
        
        Args:
            db: Database session
        """
        self.repository = UserRepository(db)
    
    async def create_user(self, user_data: UserCreate) -> User:
        """
        Create a new user
        
        Args:
            user_data: User creation data
        
        Returns:
            Created user
        
        Raises:
            UserAlreadyExistsException: If username or email already exists
        """
        # Check for duplicate username
        existing_user = await self.repository.get_by_username(user_data.username)
        if existing_user:
            raise UserAlreadyExistsException("username", user_data.username)
        
        # Check for duplicate email
        existing_email = await self.repository.get_by_email(user_data.email)
        if existing_email:
            raise UserAlreadyExistsException("email", user_data.email)
        
        # Hash password using new security module
        hashed_password = hash_password(user_data.password)
        
        # Create user
        user = await self.repository.create(
            username=user_data.username,
            email=user_data.email,
            hashed_password=hashed_password,
            full_name=user_data.full_name,
            role=user_data.role,
            is_active=True,
            is_verified=False
        )
        
        return user
    
    async def get_all_users(
        self,
        skip: int = 0,
        limit: int = 10,
        role: Optional[UserRole] = None
    ) -> list[User]:
        """Get all users with optional filtering"""
        if role:
            return await self.repository.get_by_role(role, skip, limit)
        return await self.repository.get_all(skip, limit)
    
    async def get_user_by_id(self, user_id: int) -> User:
        """Get user by ID"""
        user = await self.repository.get_by_id(user_id)
        if not user:
            raise UserNotFoundException(user_id)
        return user
    
    async def get_user_by_username(self, username: str) -> Optional[User]:
        """Get user by username"""
        return await self.repository.get_by_username(username)
    
    async def get_user_by_email(self, email: str) -> Optional[User]:
        """Get user by email"""
        return await self.repository.get_by_email(email)
    
    async def update_user(self, user_id: int, user_update: UserUpdate) -> User:
        """Update user information"""
        user = await self.get_user_by_id(user_id)
        
        update_data = user_update.model_dump(exclude_unset=True)
        
        # Check for username conflict
        if "username" in update_data:
            existing = await self.repository.get_by_username(update_data["username"])
            if existing and existing.id != user_id:
                raise UserAlreadyExistsException("username", update_data["username"])
        
        # Check for email conflict
        if "email" in update_data:
            existing = await self.repository.get_by_email(update_data["email"])
            if existing and existing.id != user_id:
                raise UserAlreadyExistsException("email", update_data["email"])
        
        updated_user = await self.repository.update(user_id, **update_data)
        if not updated_user:
            raise UserNotFoundException(user_id)
        
        return updated_user
    
    async def delete_user(self, user_id: int) -> None:
        """Delete a user"""
        deleted = await self.repository.delete(user_id)
        if not deleted:
            raise UserNotFoundException(user_id)
    
    async def get_user_count(self) -> int:
        """Get total number of users"""
        return await self.repository.count()
    
    async def authenticate_user(
        self,
        username: str,
        password: str
    ) -> Optional[User]:
        """
        Authenticate user with username and password
        
        Args:
            username: Username or email
            password: Plain text password
        
        Returns:
            User if authentication successful, None otherwise
        """
        # Try username first
        user = await self.repository.get_by_username(username)
        
        # If not found, try email
        if not user:
            user = await self.repository.get_by_email(username)
        
        if not user:
            return None
        
        # Verify password using new security module
        if not verify_password(password, user.hashed_password):
            return None
        
        # Update last login
        await self.repository.update_last_login(user.id)
        
        return user
    
    async def change_password(
        self,
        user_id: int,
        current_password: str,
        new_password: str
    ) -> bool:
        """
        Change user password
        
        Args:
            user_id: User ID
            current_password: Current password
            new_password: New password
        
        Returns:
            True if password changed successfully
        
        Raises:
            UserNotFoundException: If user not found
            ValueError: If current password is incorrect
        """
        user = await self.get_user_by_id(user_id)
        
        # Verify current password
        if not verify_password(current_password, user.hashed_password):
            raise ValueError("Current password is incorrect")
        
        # Hash new password
        hashed_password = hash_password(new_password)
        
        # Update password
        await self.repository.update(user_id, hashed_password=hashed_password)
        
        return True
    
    async def reset_password(
        self,
        email: str,
        new_password: str
    ) -> bool:
        """
        Reset user password (for password reset flow)
        
        Args:
            email: User email
            new_password: New password
        
        Returns:
            True if password reset successfully
        
        Raises:
            UserNotFoundException: If user not found
        """
        user = await self.repository.get_by_email(email)
        if not user:
            raise UserNotFoundException(email)
        
        # Hash new password
        hashed_password = hash_password(new_password)
        
        # Update password
        await self.repository.update(user.id, hashed_password=hashed_password)
        
        return True
    
    async def verify_email(self, email: str) -> bool:
        """
        Mark user email as verified
        
        Args:
            email: User email
        
        Returns:
            True if email verified successfully
        
        Raises:
            UserNotFoundException: If user not found
        """
        user = await self.repository.get_by_email(email)
        if not user:
            raise UserNotFoundException(email)
        
        await self.repository.update(user.id, is_verified=True)
        
        return True

Step 7: Create Authentication Endpoints

Create app/api/v1/endpoints/auth.py:

"""
Authentication endpoints

Handles login, registration, token refresh, and password management
"""

from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.ext.asyncio import AsyncSession
from typing import Annotated
from datetime import timedelta

from app.db.session import get_db
from app.models.auth import (
    Token,
    LoginRequest,
    RefreshTokenRequest,
    PasswordResetRequest,
    PasswordResetConfirm,
    EmailVerificationRequest,
    ChangePasswordRequest
)
from app.models.user import UserCreate, UserResponse
from app.models.common import MessageResponse
from app.services.user_service import UserService
from app.core.security import (
    create_access_token,
    create_refresh_token,
    decode_token,
    verify_token_type,
    create_password_reset_token,
    create_email_verification_token
)
from app.core.auth_dependencies import get_current_user, get_current_active_user
from app.core.config import settings
from app.db.models.user import User
from app.utils.logger import logger

router = APIRouter(prefix="/auth", tags=["Authentication"])


def get_user_service(db: AsyncSession = Depends(get_db)) -> UserService:
    """Get user service with database session"""
    return UserService(db)


# ============================================
# REGISTRATION
# ============================================


@router.post(
    "/register",
    response_model=UserResponse,
    status_code=status.HTTP_201_CREATED,
    summary="Register new user"
)
async def register(
    user_data: UserCreate,
    service: Annotated[UserService, Depends(get_user_service)]
):
    """
    Register a new user
    
    Creates a new user account with the provided information.
    Email verification is required before full access.
    
    - **username**: Unique username (3-50 characters)
    - **email**: Valid email address
    - **password**: Strong password (min 8 characters)
    - **full_name**: Optional full name
    - **role**: User role (defaults to 'user')
    """
    user = await service.create_user(user_data)
    
    logger.info(
        "User registered",
        extra={"user_id": user.id, "username": user.username, "email": user.email}
    )
    
    return UserResponse.model_validate(user)


# ============================================
# LOGIN
# ============================================


@router.post(
    "/login",
    response_model=Token,
    summary="Login user"
)
async def login(
    credentials: LoginRequest,
    service: Annotated[UserService, Depends(get_user_service)]
):
    """
    Login with username/email and password
    
    Returns access and refresh tokens for authentication.
    
    **Request Body:**
    - **username**: Username or email
    - **password**: User password
    
    **Response:**
    - **access_token**: JWT access token (expires in 30 minutes)
    - **refresh_token**: JWT refresh token (expires in 7 days)
    - **token_type**: Bearer
    - **expires_in**: Token expiration in seconds
    """
    # Authenticate user
    user = await service.authenticate_user(
        credentials.username,
        credentials.password
    )
    
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    
    # Create tokens
    access_token = create_access_token(data={"sub": str(user.id)})
    refresh_token = create_refresh_token(data={"sub": str(user.id)})
    
    logger.info(
        "User logged in",
        extra={"user_id": user.id, "username": user.username}
    )
    
    return Token(
        access_token=access_token,
        refresh_token=refresh_token,
        token_type="bearer",
        expires_in=settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60
    )


@router.post(
    "/login/form",
    response_model=Token,
    summary="Login user (OAuth2 form)"
)
async def login_form(
    form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
    service: Annotated[UserService, Depends(get_user_service)]
):
    """
    OAuth2 compatible login endpoint
    
    This endpoint follows OAuth2 password flow specification.
    Used by Swagger UI and OAuth2 clients.
    
    **Form Data:**
    - **username**: Username or email
    - **password**: User password
    """
    # Authenticate user
    user = await service.authenticate_user(
        form_data.username,
        form_data.password
    )
    
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    
    # Create tokens
    access_token = create_access_token(data={"sub": str(user.id)})
    refresh_token = create_refresh_token(data={"sub": str(user.id)})
    
    logger.info(
        "User logged in (OAuth2)",
        extra={"user_id": user.id, "username": user.username}
    )
    
    return Token(
        access_token=access_token,
        refresh_token=refresh_token,
        token_type="bearer",
        expires_in=settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60
    )


# ============================================
# TOKEN REFRESH
# ============================================


@router.post(
    "/refresh",
    response_model=Token,
    summary="Refresh access token"
)
async def refresh_token(
    refresh_request: RefreshTokenRequest,
    service: Annotated[UserService, Depends(get_user_service)]
):
    """
    Refresh access token using refresh token
    
    When the access token expires, use the refresh token
    to obtain a new access token without re-authenticating.
    
    **Request Body:**
    - **refresh_token**: Valid refresh token
    
    **Response:**
    - New access and refresh tokens
    """
    # Decode refresh token
    payload = decode_token(refresh_request.refresh_token)
    
    if payload is None:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid refresh token",
            headers={"WWW-Authenticate": "Bearer"},
        )
    
    # Verify token type
    if not verify_token_type(payload, "refresh"):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid token type",
            headers={"WWW-Authenticate": "Bearer"},
        )
    
    # Get user ID
    user_id = payload.get("sub")
    if user_id is None:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid token payload",
            headers={"WWW-Authenticate": "Bearer"},
        )
    
    # Verify user exists and is active
    try:
        user = await service.get_user_by_id(int(user_id))
    except:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="User not found",
            headers={"WWW-Authenticate": "Bearer"},
        )
    
    if not user.is_active:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Inactive user"
        )
    
    # Create new tokens (token rotation)
    new_access_token = create_access_token(data={"sub": str(user.id)})
    new_refresh_token = create_refresh_token(data={"sub": str(user.id)})
    
    logger.info(
        "Token refreshed",
        extra={"user_id": user.id}
    )
    
    return Token(
        access_token=new_access_token,
        refresh_token=new_refresh_token,
        token_type="bearer",
        expires_in=settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60
    )


# ============================================
# USER INFO
# ============================================


@router.get(
    "/me",
    response_model=UserResponse,
    summary="Get current user"
)
async def get_current_user_info(
    current_user: Annotated[User, Depends(get_current_user)]
):
    """
    Get current authenticated user information
    
    Returns the profile of the currently authenticated user.
    Requires valid access token.
    """
    return UserResponse.model_validate(current_user)


# ============================================
# PASSWORD MANAGEMENT
# ============================================


@router.post(
    "/password/change",
    response_model=MessageResponse,
    summary="Change password"
)
async def change_password(
    password_data: ChangePasswordRequest,
    current_user: Annotated[User, Depends(get_current_user)],
    service: Annotated[UserService, Depends(get_user_service)]
):
    """
    Change user password
    
    Allows authenticated users to change their password.
    Requires current password for verification.
    
    **Request Body:**
    - **current_password**: Current password
    - **new_password**: New password (min 8 characters)
    """
    try:
        await service.change_password(
            current_user.id,
            password_data.current_password,
            password_data.new_password
        )
        
        logger.info(
            "Password changed",
            extra={"user_id": current_user.id}
        )
        
        return MessageResponse(
            message="Password changed successfully",
            success=True
        )
    except ValueError as e:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail=str(e)
        )


@router.post(
    "/password/reset/request",
    response_model=MessageResponse,
    summary="Request password reset"
)
async def request_password_reset(
    reset_request: PasswordResetRequest,
    service: Annotated[UserService, Depends(get_user_service)]
):
    """
    Request password reset email
    
    Sends a password reset token to the user's email.
    Token expires in 1 hour.
    
    **Request Body:**
    - **email**: User email address
    
    **Note:** Always returns success to prevent email enumeration
    """
    # Get user by email
    user = await service.get_user_by_email(reset_request.email)
    
    if user:
        # Create password reset token
        reset_token = create_password_reset_token(user.email)
        
        # TODO: Send email with reset token
        # For now, we'll log it (in production, send email)
        logger.info(
            "Password reset requested",
            extra={
                "user_id": user.id,
                "email": user.email,
                "reset_token": reset_token  # Remove in production!
            }
        )
        
        # In development, return token in response
        # In production, only send via email
        if settings.ENVIRONMENT == "development":
            return MessageResponse(
                message=f"Password reset token (DEV ONLY): {reset_token}",
                success=True
            )
    
    # Always return success to prevent email enumeration
    return MessageResponse(
        message="If the email exists, a password reset link has been sent",
        success=True
    )


@router.post(
    "/password/reset/confirm",
    response_model=MessageResponse,
    summary="Confirm password reset"
)
async def confirm_password_reset(
    reset_data: PasswordResetConfirm,
    service: Annotated[UserService, Depends(get_user_service)]
):
    """
    Reset password with token
    
    Uses the password reset token to set a new password.
    
    **Request Body:**
    - **token**: Password reset token (from email)
    - **new_password**: New password (min 8 characters)
    """
    # Decode token
    payload = decode_token(reset_data.token)
    
    if payload is None:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="Invalid or expired token"
        )
    
    # Verify token type
    if not verify_token_type(payload, "password_reset"):
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="Invalid token type"
        )
    
    # Get email from token
    email = payload.get("sub")
    if email is None:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="Invalid token payload"
        )
    
    # Reset password
    try:
        await service.reset_password(email, reset_data.new_password)
        
        logger.info(
            "Password reset completed",
            extra={"email": email}
        )
        
        return MessageResponse(
            message="Password reset successful",
            success=True
        )
    except Exception as e:
        logger.error(f"Password reset failed: {e}")
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="Password reset failed"
        )


# ============================================
# EMAIL VERIFICATION
# ============================================


@router.post(
    "/email/verification/request",
    response_model=MessageResponse,
    summary="Request email verification"
)
async def request_email_verification(
    current_user: Annotated[User, Depends(get_current_user)],
):
    """
    Request email verification
    
    Sends a verification email to the user's email address.
    Token expires in 24 hours.
    
    **Note:** User must be authenticated
    """
    if current_user.is_verified:
        return MessageResponse(
            message="Email already verified",
            success=True
        )
    
    # Create verification token
    verification_token = create_email_verification_token(current_user.email)
    
    # TODO: Send email with verification token
    # For now, we'll log it (in production, send email)
    logger.info(
        "Email verification requested",
        extra={
            "user_id": current_user.id,
            "email": current_user.email,
            "verification_token": verification_token  # Remove in production!
        }
    )
    
    # In development, return token in response
    if settings.ENVIRONMENT == "development":
        return MessageResponse(
            message=f"Verification token (DEV ONLY): {verification_token}",
            success=True
        )
    
    return MessageResponse(
        message="Verification email sent",
        success=True
    )


@router.post(
    "/email/verification/confirm",
    response_model=MessageResponse,
    summary="Confirm email verification"
)
async def confirm_email_verification(
    verification_data: EmailVerificationRequest,
    service: Annotated[UserService, Depends(get_user_service)]
):
    """
    Verify email with token
    
    Uses the email verification token to mark email as verified.
    
    **Request Body:**
    - **token**: Email verification token (from email)
    """
    # Decode token
    payload = decode_token(verification_data.token)
    
    if payload is None:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="Invalid or expired token"
        )
    
    # Verify token type
    if not verify_token_type(payload, "email_verification"):
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="Invalid token type"
        )
    
    # Get email from token
    email = payload.get("sub")
    if email is None:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="Invalid token payload"
        )
    
    # Verify email
    try:
        await service.verify_email(email)
        
        logger.info(
            "Email verified",
            extra={"email": email}
        )
        
        return MessageResponse(
            message="Email verified successfully",
            success=True
        )
    except Exception as e:
        logger.error(f"Email verification failed: {e}")
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="Email verification failed"
        )


# ============================================
# LOGOUT
# ============================================


@router.post(
    "/logout",
    response_model=MessageResponse,
    summary="Logout user"
)
async def logout(
    current_user: Annotated[User, Depends(get_current_user)]
):
    """
    Logout current user
    
    Since JWT tokens are stateless, logout is handled client-side
    by removing tokens from storage.
    
    For enhanced security, implement token blacklisting.
    
    **Note:** Client should delete stored tokens after this call
    """
    logger.info(
        "User logged out",
        extra={"user_id": current_user.id}
    )
    
    return MessageResponse(
        message="Logged out successfully. Please delete tokens from client storage.",
        success=True
    )

Step 8: Update API Router

Update app/api/v1/api.py:

"""
API v1 router aggregator

Combines all v1 endpoint routers
"""

from fastapi import APIRouter
from app.api.v1.endpoints import (
    users,
    health,
    users_advanced,
    dependencies_demo,
    conversations,
    auth  # New!
)

# Create main v1 router
api_router = APIRouter()

# Include all endpoint routers
api_router.include_router(health.router)
api_router.include_router(auth.router)  # Auth first for Swagger UI
api_router.include_router(users.router)
api_router.include_router(users_advanced.router)
api_router.include_router(dependencies_demo.router)
api_router.include_router(conversations.router)

Step 9: Protect Existing Endpoints

Update app/api/v1/endpoints/users.py to protect routes:

"""
User management endpoints

Updated with authentication protection
"""

from fastapi import APIRouter, Depends, status
from sqlalchemy.ext.asyncio import AsyncSession
from typing import Annotated

from app.models.user import User, UserCreate, UserUpdate, UserResponse, UserRole
from app.services.user_service import UserService
from app.db.session import get_db
from app.core.auth_dependencies import (
    get_current_user,
    get_current_active_user,
    require_admin,
    RoleChecker
)
from app.db.models.user import User as DBUser

router = APIRouter(prefix="/users", tags=["Users"])


def get_user_service(db: AsyncSession = Depends(get_db)) -> UserService:
    """Dependency to get user service with database session"""
    return UserService(db)


@router.post(
    "",
    response_model=UserResponse,
    status_code=status.HTTP_201_CREATED,
    summary="Create a new user",
    dependencies=[Depends(require_admin)]  # Only admins can create users
)
async def create_user(
    user_data: UserCreate,
    service: Annotated[UserService, Depends(get_user_service)],
    current_user: Annotated[DBUser, Depends(require_admin)]  # Verify admin
):
    """
    Create a new user (Admin only)
    
    Only users with admin role can create new users.
    
    - **username**: Unique username (3-50 characters)
    - **email**: Valid email address
    - **full_name**: Optional full name
    - **password**: Password (min 8 characters)
    - **role**: User role (admin, user, guest)
    """
    user = await service.create_user(user_data)
    return UserResponse.model_validate(user)


@router.get(
    "",
    response_model=list[UserResponse],
    summary="Get all users",
    dependencies=[Depends(get_current_active_user)]  # Requires authentication
)
async def get_users(
    skip: int = 0,
    limit: int = 10,
    role: UserRole | None = None,
    service: Annotated[UserService, Depends(get_user_service)] = None,
    current_user: Annotated[DBUser, Depends(get_current_active_user)] = None
):
    """
    Retrieve users with optional filtering (Authenticated users only)
    
    - **skip**: Number of records to skip (for pagination)
    - **limit**: Maximum number of records to return
    - **role**: Filter by user role
    """
    users = await service.get_all_users(skip=skip, limit=limit, role=role)
    return [UserResponse.model_validate(user) for user in users]


@router.get(
    "/me",
    response_model=UserResponse,
    summary="Get current user profile"
)
async def get_my_profile(
    current_user: Annotated[DBUser, Depends(get_current_active_user)]
):
    """
    Get current user's profile
    
    Returns the authenticated user's profile information.
    """
    return UserResponse.model_validate(current_user)


@router.get(
    "/{user_id}",
    response_model=UserResponse,
    summary="Get user by ID",
    dependencies=[Depends(get_current_active_user)]
)
async def get_user(
    user_id: int,
    service: Annotated[UserService, Depends(get_user_service)],
    current_user: Annotated[DBUser, Depends(get_current_active_user)]
):
    """
    Get a specific user by their ID (Authenticated users only)
    """
    user = await service.get_user_by_id(user_id)
    return UserResponse.model_validate(user)


@router.patch(
    "/{user_id}",
    response_model=UserResponse,
    summary="Update user"
)
async def update_user(
    user_id: int,
    user_update: UserUpdate,
    service: Annotated[UserService, Depends(get_user_service)],
    current_user: Annotated[DBUser, Depends(get_current_active_user)]
):
    """
    Update user information
    
    Users can only update their own profile unless they are admin.
    Only provided fields will be updated.
    """
    # Users can only update themselves unless admin
    if current_user.id != user_id and current_user.role != UserRole.ADMIN:
        from fastapi import HTTPException
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Not authorized to update this user"
        )
    
    user = await service.update_user(user_id, user_update)
    return UserResponse.model_validate(user)


@router.delete(
    "/{user_id}",
    status_code=status.HTTP_204_NO_CONTENT,
    summary="Delete user",
    dependencies=[Depends(require_admin)]  # Only admins can delete
)
async def delete_user(
    user_id: int,
    service: Annotated[UserService, Depends(get_user_service)],
    current_user: Annotated[DBUser, Depends(require_admin)]
):
    """
    Delete a user by ID (Admin only)
    
    Only administrators can delete users.
    """
    await service.delete_user(user_id)
    return None


@router.get(
    "/stats/count",
    summary="Get user statistics",
    dependencies=[Depends(require_admin)]  # Only admins can see stats
)
async def get_user_stats(
    service: Annotated[UserService, Depends(get_user_service)],
    current_user: Annotated[DBUser, Depends(require_admin)]
):
    """
    Get user statistics (Admin only)
    
    Returns total user count and other statistics.
    """
    total = await service.get_user_count()
    return {"total_users": total}

Step 10: Update Main Application

Update app/main.py to show authentication in docs:

"""
FastAPI AI Backend - Main Application

Updated with JWT authentication
"""

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.exceptions import RequestValidationError
from pydantic import ValidationError
from contextlib import asynccontextmanager

from app.core.config import settings
from app.api.v1.api import api_router
from app.core.exceptions import AppException
from app.core.error_handlers import (
    app_exception_handler,
    validation_exception_handler,
    generic_exception_handler,
)
from app.middleware.logging_middleware import LoggingMiddleware
from app.middleware.performance_middleware import PerformanceMiddleware
from app.utils.logger import logger
from app.db.session import close_db
from app.db.utils import check_database_connection, get_database_info


@asynccontextmanager
async def lifespan(app: FastAPI):
    """Application lifespan events"""
    # Startup
    logger.info(
        "Application startup",
        extra={
            "app_name": settings.APP_NAME,
            "version": settings.APP_VERSION,
            "environment": settings.ENVIRONMENT,
        },
    )
    
    print(f"🚀 Starting {settings.APP_NAME} v{settings.APP_VERSION}")
    print(f"📝 Environment: {settings.ENVIRONMENT}")
    print(f"🔧 Debug mode: {settings.DEBUG}")
    
    # Check database connection
    db_connected = await check_database_connection()
    if db_connected:
        print("✅ Database connection successful")
        db_info = await get_database_info()
        print(f"📊 PostgreSQL version: {db_info.get('version', 'unknown')}")
    else:
        print("❌ Database connection failed!")
    
    print(f"🔐 JWT Authentication enabled")
    print(f"📚 API Docs: http://{settings.HOST}:{settings.PORT}/docs")
    print(f"🔑 Use /api/v1/auth/login to authenticate")
    
    yield
    
    # Shutdown
    logger.info("Application shutdown", extra={"app_name": settings.APP_NAME})
    print(f"👋 Shutting down {settings.APP_NAME}")
    await close_db()


def create_application() -> FastAPI:
    """Application factory pattern"""
    
    app = FastAPI(
        title=settings.APP_NAME,
        version=settings.APP_VERSION,
        description="""
        A production-ready FastAPI backend for AI applications.
        
        ## Features
        * **JWT Authentication** with access and refresh tokens
        * **Role-Based Access Control** (RBAC)
        * **Password Management** (change, reset, verification)
        * **Email Verification** workflow
        * **OAuth2 Password Flow** for Swagger UI
        * **Database Integration** with PostgreSQL
        * **Async Operations** for high performance
        * **Repository Pattern** for clean data access
        * **Advanced Validation** with Pydantic
        * **Comprehensive Error Handling**
        * **Structured Logging**
        
        ## Authentication
        
        1. **Register**: POST /api/v1/auth/register
        2. **Login**: POST /api/v1/auth/login
        3. **Use Token**: Add `Authorization: Bearer <token>` header
        4. **Refresh**: POST /api/v1/auth/refresh when token expires
        
        ## Quick Start
        
```bash
        # Register a user
        curl -X POST "http://localhost:8000/api/v1/auth/register" \\
          -H "Content-Type: application/json" \\
          -d '{"username":"user","email":"user@example.com","password":"password123"}'
        
        # Login
        curl -X POST "http://localhost:8000/api/v1/auth/login" \\
          -H "Content-Type: application/json" \\
          -d '{"username":"user","password":"password123"}'
        
        # Use the access_token in subsequent requests
        curl -X GET "http://localhost:8000/api/v1/users/me" \\
          -H "Authorization: Bearer <your_access_token>"
```
        
        ## Security
        
        * Passwords hashed with bcrypt
        * JWT tokens with expiration
        * Refresh token rotation
        * Role-based permissions
        * Password reset via email tokens
        * Email verification
        """,
        debug=settings.DEBUG,
        docs_url="/docs",
        redoc_url="/redoc",
        openapi_url="/openapi.json",
        lifespan=lifespan,
    )
    
    # Middleware
    app.add_middleware(PerformanceMiddleware, slow_request_threshold=1.0)
    app.add_middleware(LoggingMiddleware)
    app.add_middleware(
        CORSMiddleware,
        allow_origins=settings.allowed_origins_list,
        allow_credentials=True,
        allow_methods=["*"],
        allow_headers=["*"],
        expose_headers=["X-Request-ID", "X-Process-Time"],
    )
    
    # Exception Handlers
    app.add_exception_handler(AppException, app_exception_handler)
    app.add_exception_handler(RequestValidationError, validation_exception_handler)
    app.add_exception_handler(ValidationError, validation_exception_handler)
    app.add_exception_handler(Exception, generic_exception_handler)
    
    # Routers
    app.include_router(api_router, prefix=settings.API_V1_PREFIX)
    
    return app


app = create_application()

Step 11: Create Authentication Testing Script

Create test_authentication.py:

"""
Test script for authentication endpoints

Tests all JWT authentication functionality
"""

import requests
import json
from typing import Dict, Any

BASE_URL = "http://127.0.0.1:8000/api/v1"


def print_test(title: str, response: requests.Response):
    """Print test results"""
    print(f"\n{'='*70}")
    print(f"{title}")
    print(f"{'='*70}")
    print(f"Status: {response.status_code}")
    try:
        data = response.json()
        print(f"Response:\n{json.dumps(data, indent=2, default=str)}")
    except:
        print(f"Response: {response.text[:500]}")


def test_register():
    """Test user registration"""
    user_data = {
        "username": "auth_test_user",
        "email": "authtest@example.com",
        "password": "SecurePass123",
        "full_name": "Auth Test User"
    }
    
    response = requests.post(f"{BASE_URL}/auth/register", json=user_data)
    print_test("✅ REGISTER USER", response)
    
    if response.status_code == 201:
        return response.json()
    return None


def test_login(username: str, password: str):
    """Test user login"""
    credentials = {
        "username": username,
        "password": password
    }
    
    response = requests.post(f"{BASE_URL}/auth/login", json=credentials)
    print_test("🔐 LOGIN", response)
    
    if response.status_code == 200:
        return response.json()
    return None


def test_login_invalid():
    """Test login with invalid credentials"""
    credentials = {
        "username": "wrong_user",
        "password": "wrong_password"
    }
    
    response = requests.post(f"{BASE_URL}/auth/login", json=credentials)
    print_test("❌ LOGIN (INVALID CREDENTIALS)", response)


def test_get_current_user(access_token: str):
    """Test getting current user info"""
    headers = {
        "Authorization": f"Bearer {access_token}"
    }
    
    response = requests.get(f"{BASE_URL}/auth/me", headers=headers)
    print_test("👤 GET CURRENT USER", response)


def test_protected_route_without_token():
    """Test accessing protected route without token"""
    response = requests.get(f"{BASE_URL}/users")
    print_test("🔒 PROTECTED ROUTE (NO TOKEN)", response)


def test_protected_route_with_token(access_token: str):
    """Test accessing protected route with valid token"""
    headers = {
        "Authorization": f"Bearer {access_token}"
    }
    
    response = requests.get(f"{BASE_URL}/users", headers=headers)
    print_test("🔓 PROTECTED ROUTE (WITH TOKEN)", response)


def test_refresh_token(refresh_token: str):
    """Test token refresh"""
    refresh_data = {
        "refresh_token": refresh_token
    }
    
    response = requests.post(f"{BASE_URL}/auth/refresh", json=refresh_data)
    print_test("🔄 REFRESH TOKEN", response)
    
    if response.status_code == 200:
        return response.json()
    return None


def test_change_password(access_token: str):
    """Test password change"""
    headers = {
        "Authorization": f"Bearer {access_token}"
    }
    
    password_data = {
        "current_password": "SecurePass123",
        "new_password": "NewSecurePass123"
    }
    
    response = requests.post(
        f"{BASE_URL}/auth/password/change",
        json=password_data,
        headers=headers
    )
    print_test("🔑 CHANGE PASSWORD", response)


def test_password_reset_request():
    """Test password reset request"""
    reset_data = {
        "email": "authtest@example.com"
    }
    
    response = requests.post(
        f"{BASE_URL}/auth/password/reset/request",
        json=reset_data
    )
    print_test("📧 PASSWORD RESET REQUEST", response)
    
    # Extract token from development response
    if response.status_code == 200:
        data = response.json()
        message = data.get("message", "")
        if "DEV ONLY" in message:
            # Extract token from message
            token = message.split(": ")[1] if ": " in message else None
            return token
    return None


def test_password_reset_confirm(reset_token: str):
    """Test password reset confirmation"""
    if not reset_token:
        print("\n⚠️  No reset token available, skipping password reset confirm")
        return
    
    reset_data = {
        "token": reset_token,
        "new_password": "ResetPass123"
    }
    
    response = requests.post(
        f"{BASE_URL}/auth/password/reset/confirm",
        json=reset_data
    )
    print_test("✅ PASSWORD RESET CONFIRM", response)


def test_email_verification_request(access_token: str):
    """Test email verification request"""
    headers = {
        "Authorization": f"Bearer {access_token}"
    }
    
    response = requests.post(
        f"{BASE_URL}/auth/email/verification/request",
        headers=headers
    )
    print_test("📧 EMAIL VERIFICATION REQUEST", response)
    
    # Extract token from development response
    if response.status_code == 200:
        data = response.json()
        message = data.get("message", "")
        if "DEV ONLY" in message:
            token = message.split(": ")[1] if ": " in message else None
            return token
    return None


def test_email_verification_confirm(verification_token: str):
    """Test email verification confirmation"""
    if not verification_token:
        print("\n⚠️  No verification token available, skipping")
        return
    
    verification_data = {
        "token": verification_token
    }
    
    response = requests.post(
        f"{BASE_URL}/auth/email/verification/confirm",
        json=verification_data
    )
    print_test("✅ EMAIL VERIFICATION CONFIRM", response)


def test_role_based_access(access_token: str):
    """Test role-based access control"""
    headers = {
        "Authorization": f"Bearer {access_token}"
    }
    
    # Try to access admin-only endpoint (should fail for regular user)
    user_data = {
        "username": "new_user",
        "email": "newuser@example.com",
        "password": "Password123"
    }
    
    response = requests.post(
        f"{BASE_URL}/users",
        json=user_data,
        headers=headers
    )
    print_test("🚫 ADMIN ENDPOINT (REGULAR USER)", response)


def test_logout(access_token: str):
    """Test logout"""
    headers = {
        "Authorization": f"Bearer {access_token}"
    }
    
    response = requests.post(f"{BASE_URL}/auth/logout", headers=headers)
    print_test("👋 LOGOUT", response)


def test_expired_token():
    """Test using expired token"""
    # Use a token that's clearly expired (or malformed)
    expired_token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxIiwiZXhwIjoxNjAwMDAwMDAwfQ.invalid"
    
    headers = {
        "Authorization": f"Bearer {expired_token}"
    }
    
    response = requests.get(f"{BASE_URL}/auth/me", headers=headers)
    print_test("⏰ EXPIRED/INVALID TOKEN", response)


def run_all_tests():
    """Run complete authentication test suite"""
    print("\n" + "🧪"*35)
    print("AUTHENTICATION TEST SUITE")
    print("🧪"*35)
    
    # Registration
    print("\n" + "="*70)
    print("REGISTRATION")
    print("="*70)
    user = test_register()
    
    # Login
    print("\n" + "="*70)
    print("LOGIN & AUTHENTICATION")
    print("="*70)
    test_login_invalid()
    
    if not user:
        print("\n❌ User registration failed, cannot continue tests")
        return
    
    tokens = test_login("auth_test_user", "SecurePass123")
    
    if not tokens:
        print("\n❌ Login failed, cannot continue tests")
        return
    
    access_token = tokens['access_token']
    refresh_token = tokens['refresh_token']
    
    # Protected Routes
    print("\n" + "="*70)
    print("PROTECTED ROUTES")
    print("="*70)
    test_protected_route_without_token()
    test_protected_route_with_token(access_token)
    test_get_current_user(access_token)
    
    # Token Refresh
    print("\n" + "="*70)
    print("TOKEN REFRESH")
    print("="*70)
    new_tokens = test_refresh_token(refresh_token)
    if new_tokens:
        access_token = new_tokens['access_token']
    
    # Password Management
    print("\n" + "="*70)
    print("PASSWORD MANAGEMENT")
    print("="*70)
    test_change_password(access_token)
    
    # Re-login with new password
    print("\n🔄 Re-logging in with new password...")
    tokens = test_login("auth_test_user", "NewSecurePass123")
    if tokens:
        access_token = tokens['access_token']
    
    # Password Reset Flow
    print("\n" + "="*70)
    print("PASSWORD RESET FLOW")
    print("="*70)
    reset_token = test_password_reset_request()
    test_password_reset_confirm(reset_token)
    
    # Re-login with reset password
    if reset_token:
        print("\n🔄 Re-logging in with reset password...")
        tokens = test_login("auth_test_user", "ResetPass123")
        if tokens:
            access_token = tokens['access_token']
    
    # Email Verification
    print("\n" + "="*70)
    print("EMAIL VERIFICATION")
    print("="*70)
    verification_token = test_email_verification_request(access_token)
    test_email_verification_confirm(verification_token)
    
    # Role-Based Access Control
    print("\n" + "="*70)
    print("ROLE-BASED ACCESS CONTROL")
    print("="*70)
    test_role_based_access(access_token)
    
    # Token Expiration
    print("\n" + "="*70)
    print("TOKEN VALIDATION")
    print("="*70)
    test_expired_token()
    
    # Logout
    print("\n" + "="*70)
    print("LOGOUT")
    print("="*70)
    test_logout(access_token)
    
    print("\n" + "✅"*35)
    print("ALL AUTHENTICATION TESTS COMPLETED!")
    print("✅"*35)
    print("\n💡 Key Features Tested:")
    print("   ✅ User registration")
    print("   ✅ Login with JWT tokens")
    print("   ✅ Token refresh")
    print("   ✅ Protected routes")
    print("   ✅ Password change")
    print("   ✅ Password reset flow")
    print("   ✅ Email verification")
    print("   ✅ Role-based access control")
    print("   ✅ Token validation")
    print("   ✅ Logout\n")


if __name__ == "__main__":
    print("""
    ╔════════════════════════════════════════════════════════╗
    ║  Authentication Test Suite                            ║
    ║                                                        ║
    ║  Tests:                                                ║
    ║  - User registration                                   ║
    ║  - Login & token generation                            ║
    ║  - Protected route access                              ║
    ║  - Token refresh                                       ║
    ║  - Password management                                 ║
    ║  - Email verification                                  ║
    ║  - Role-based access control                           ║
    ║                                                        ║
    ║  Prerequisites:                                        ║
    ║  1. PostgreSQL running                                 ║
    ║  2. FastAPI server running                             ║
    ║  3. Database migrations applied                        ║
    ╚════════════════════════════════════════════════════════╝
    """)
    
    try:
        response = requests.get(f"{BASE_URL}/health")
        if response.status_code == 200:
            run_all_tests()
        else:
            print("❌ Server returned unexpected status")
    except requests.exceptions.ConnectionError:
        print("❌ ERROR: Cannot connect to server!")
        print("   Please start the server with: python main.py")
    except Exception as e:
        print(f"❌ ERROR: {e}")

Step 12: Create Token Blacklist (Optional Advanced Feature)

Create app/db/models/token_blacklist.py:

"""
Token blacklist model

For advanced security: blacklist revoked tokens
"""

from sqlalchemy import String, DateTime, Integer
from sqlalchemy.orm import Mapped, mapped_column
from datetime import datetime

from app.db.base import Base


class TokenBlacklist(Base):
    """
    Token blacklist model
    
    Stores revoked tokens to prevent their use after logout
    """
    __tablename__ = "token_blacklist"
    
    id: Mapped[int] = mapped_column(primary_key=True, index=True)
    
    # Token identifier (jti claim from JWT)
    jti: Mapped[str] = mapped_column(
        String(255),
        unique=True,
        index=True,
        nullable=False
    )
    
    # Token type (access or refresh)
    token_type: Mapped[str] = mapped_column(
        String(50),
        nullable=False
    )
    
    # User ID who owned the token
    user_id: Mapped[int] = mapped_column(
        Integer,
        nullable=False,
        index=True
    )
    
    # When token was blacklisted
    blacklisted_at: Mapped[datetime] = mapped_column(
        DateTime,
        default=datetime.utcnow,
        nullable=False
    )
    
    # Token expiration (for cleanup)
    expires_at: Mapped[datetime] = mapped_column(
        DateTime,
        nullable=False,
        index=True
    )
    
    def __repr__(self) -> str:
        return f"<TokenBlacklist(jti='{self.jti}', type='{self.token_type}')>"

Create migration for token blacklist:

alembic revision --autogenerate -m "Add token blacklist table"
alembic upgrade head

Step 13: Update Requirements

Update requirements.txt:

# FastAPI and core dependencies
fastapi==0.115.0
uvicorn[standard]==0.32.1
pydantic==2.10.3
pydantic-settings==2.7.0
pydantic[email]==2.10.3

# Email validation
email-validator==2.3.0
dnspython==2.8.0

# Phone number validation
phonenumbers==9.0.28

# Environment configuration
python-dotenv==1.2.2

# Structured logging
python-json-logger==4.1.0

# HTTP client
requests==2.33.1

# Database - SQLAlchemy
sqlalchemy[asyncio]==2.0.23
asyncpg==0.29.0
psycopg2-binary==2.9.9

# Database migrations
alembic==1.13.1

# Password hashing (modern bcrypt)
pwdlib==0.2.1

# JWT tokens
python-jose[cryptography]==3.3.0

# ASGI server
starlette==0.45.0

Step 14: Production Security Checklist

Create SECURITY.md:

# Security Best Practices

## JWT Security

### 1. Secret Key Management

**NEVER commit secret keys to version control!**

```bash
# Generate strong secret key
openssl rand -hex 32

# Store in environment variables
SECRET_KEY=your-generated-secret-key-here

# Use different keys for different environments
# Development: one key
# Staging: different key
# Production: different key
```

### 2. Token Expiration

**Configure appropriate expiration times:**

```bash
# Short-lived access tokens (15-30 minutes)
ACCESS_TOKEN_EXPIRE_MINUTES=30

# Longer refresh tokens (7 days)
REFRESH_TOKEN_EXPIRE_DAYS=7

# Password reset tokens (1 hour)
PASSWORD_RESET_TOKEN_EXPIRE_HOURS=1
```

### 3. HTTPS Only in Production

```python
# Force HTTPS in production
if settings.ENVIRONMENT == "production":
    app.add_middleware(HTTPSRedirectMiddleware)
```

### 4. Token Storage (Frontend)

**Client-Side Best Practices:**

- ✅ Store tokens in httpOnly cookies (most secure)
- ⚠️ If using localStorage:
  - Vulnerable to XSS attacks
  - Never store sensitive data
  - Clear on logout
- ❌ Never store in sessionStorage for persistent login

**Example (React):**

```javascript
// Good: httpOnly cookie (set by server)
// Server sets: Set-Cookie: access_token=...; HttpOnly; Secure; SameSite=Strict

// Acceptable: localStorage with precautions
localStorage.setItem('access_token', token);

// On logout
localStorage.removeItem('access_token');
localStorage.removeItem('refresh_token');
```

### 5. Rate Limiting

**Implement rate limiting on auth endpoints:**

```python
from slowapi import Limiter
from slowapi.util import get_remote_address

limiter = Limiter(key_func=get_remote_address)

@router.post("/login")
@limiter.limit("5/minute")  # 5 login attempts per minute
async def login(...):
    ...
```

### 6. Password Security

**Enforce strong passwords:**

```python
# Minimum requirements
- Min 8 characters
- At least 1 uppercase
- At least 1 lowercase
- At least 1 number
- At least 1 special character

# Use pwdlib (modern bcrypt)
from pwdlib import PasswordHash
password_hash = PasswordHash.recommended()
```

### 7. Token Blacklisting

**For critical applications, implement token blacklisting:**

```python
# On logout
await blacklist_token(token_jti, user_id, expires_at)

# On token validation
if await is_token_blacklisted(token_jti):
    raise HTTPException(401, "Token revoked")
```

### 8. CORS Configuration

**Restrict CORS to known origins:**

```python
# ❌ Never in production
allow_origins=["*"]

# ✅ Production
allow_origins=[
    "https://yourdomain.com",
    "https://app.yourdomain.com"
]
```

### 9. SQL Injection Prevention

**SQLAlchemy protects against SQL injection, but:**

```python
# ✅ Good (parameterized)
await session.execute(
    select(User).where(User.username == username)
)

# ❌ Bad (string concatenation)
query = f"SELECT * FROM users WHERE username = '{username}'"
```

### 10. Input Validation

**Always validate input with Pydantic:**

```python
class UserCreate(BaseModel):
    username: str = Field(..., min_length=3, max_length=50)
    email: EmailStr
    password: str = Field(..., min_length=8)
```

## Deployment Security

### Environment Variables

```bash
# .env (NEVER commit)
SECRET_KEY=production-secret-key
DATABASE_URL=postgresql://user:pass@prod-db:5432/db

# Use secrets management
# AWS: AWS Secrets Manager
# GCP: Secret Manager
# Azure: Key Vault
# Kubernetes: Secrets
```

### Database Security

```bash
# Use SSL for database connections
DATABASE_URL=postgresql://user:pass@host:5432/db?sslmode=require

# Restrict database access
# Only allow application servers
# Use firewall rules
# Enable audit logging
```

### Monitoring & Alerts

```python
# Log authentication events
logger.info("Failed login attempt", extra={
    "username": username,
    "ip": client_ip,
    "user_agent": user_agent
})

# Alert on suspicious activity
# - Multiple failed logins
# - Password reset requests
# - Access from new locations
```

### Regular Security Updates

```bash
# Update dependencies regularly
pip list --outdated
pip install --upgrade package-name

# Check for vulnerabilities
pip install safety
safety check

# Monitor CVE databases
```

## Common Vulnerabilities

### Prevent Timing Attacks

```python
# ❌ Bad (timing attack vulnerable)
if user.password == password:
    return True

# ✅ Good (constant time comparison)
from pwdlib import PasswordHash
password_hash.verify(password, user.hashed_password)
```

### Prevent Email Enumeration

```python
# ❌ Bad
if not user:
    raise HTTPException(404, "User not found")

# ✅ Good (same response for existing/non-existing)
return MessageResponse(
    message="If the email exists, a reset link has been sent"
)
```

### Prevent Brute Force

```python
# Implement rate limiting
# Add CAPTCHA after failed attempts
# Lock account after N failed attempts
# Implement exponential backoff
```

## Incident Response

### If Secret Key Compromised

1. Generate new secret key
2. Invalidate all existing tokens
3. Force users to re-authenticate
4. Notify users
5. Review access logs

### If Database Compromised

1. Rotate database credentials
2. Force password reset for all users
3. Review audit logs
4. Notify affected users
5. Report to authorities if required

## Compliance

### GDPR

- Allow users to download their data
- Allow users to delete their data
- Log data access
- Implement data retention policies

### HIPAA (if applicable)

- Encrypt data at rest and in transit
- Implement audit trails
- Access controls
- Regular security assessments

## Security Checklist

- [ ] Strong secret keys generated and stored securely
- [ ] HTTPS enabled in production
- [ ] Token expiration configured appropriately
- [ ] Rate limiting on auth endpoints
- [ ] Password strength requirements enforced
- [ ] CORS properly configured
- [ ] SQL injection protection verified
- [ ] Input validation comprehensive
- [ ] Logging and monitoring set up
- [ ] Regular security updates scheduled
- [ ] Incident response plan documented
- [ ] Backup and recovery tested

Step 15: Create Deployment Guide

Create AUTHENTICATION_DEPLOYMENT.md:

# Authentication Deployment Guide

## Pre-Deployment Checklist

### 1. Environment Configuration

```bash
# Generate production secret key
openssl rand -hex 32

# Update .env.production
SECRET_KEY=
ALGORITHM=HS256
ACCESS_TOKEN_EXPIRE_MINUTES=30
REFRESH_TOKEN_EXPIRE_DAYS=7
ENVIRONMENT=production
DEBUG=False
```

### 2. Database Setup

```bash
# Run migrations
alembic upgrade head

# Verify tables
psql -h host -U user -d db -c "\dt"

# Should include: users, conversations, messages, token_blacklist
```

### 3. Create Admin User

```python
# create_admin.py
import asyncio
from app.db.session import async_session_maker
from app.db.repositories.user_repository import UserRepository
from app.core.security import hash_password
from app.db.models.user import UserRole

async def create_admin():
    async with async_session_maker() as session:
        repo = UserRepository(session)
        
        admin = await repo.create(
            username="admin",
            email="admin@yourdomain.com",
            hashed_password=hash_password("CHANGE_THIS_PASSWORD"),
            full_name="Administrator",
            role=UserRole.ADMIN,
            is_active=True,
            is_verified=True
        )
        
        await session.commit()
        print(f"Admin created: {admin.username}")

asyncio.run(create_admin())
```

### 4. SSL/TLS Configuration

```nginx
# nginx.conf
server {
    listen 443 ssl http2;
    server_name api.yourdomain.com;
    
    ssl_certificate /etc/ssl/certs/cert.pem;
    ssl_certificate_key /etc/ssl/private/key.pem;
    
    location / {
        proxy_pass http://localhost:8000;
        proxy_set_header Host $host;
        proxy_set_header X-Real-IP $remote_addr;
        proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
        proxy_set_header X-Forwarded-Proto $scheme;
    }
}

# Redirect HTTP to HTTPS
server {
    listen 80;
    server_name api.yourdomain.com;
    return 301 https://$server_name$request_uri;
}
```

### 5. Email Configuration (Optional)

```bash
# SMTP settings
SMTP_HOST=smtp.gmail.com
SMTP_PORT=587
SMTP_USER=your-email@gmail.com
SMTP_PASSWORD=your-app-password
EMAILS_FROM_EMAIL=noreply@yourdomain.com
```

## Deployment Options

### Option 1: Docker

```dockerfile
# Dockerfile
FROM python:3.11-slim

WORKDIR /app

COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt

COPY . .

CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
```

```yaml
# docker-compose.yml
version: '3.8'

services:
  api:
    build: .
    ports:
      - "8000:8000"
    environment:
      - DATABASE_URL=${DATABASE_URL}
      - SECRET_KEY=${SECRET_KEY}
    depends_on:
      - postgres
  
  postgres:
    image: postgres:15-alpine
    environment:
      POSTGRES_DB: aiverse_db
      POSTGRES_USER: aiverse_user
      POSTGRES_PASSWORD: ${DB_PASSWORD}
    volumes:
      - postgres_data:/var/lib/postgresql/data

volumes:
  postgres_data:
```

### Option 2: Systemd Service

```ini
# /etc/systemd/system/aiverse-api.service
[Unit]
Description=AIVerse FastAPI Application
After=network.target

[Service]
Type=notify
User=www-data
Group=www-data
WorkingDirectory=/var/www/aiverse-api
Environment="PATH=/var/www/aiverse-api/venv/bin"
EnvironmentFile=/var/www/aiverse-api/.env
ExecStart=/var/www/aiverse-api/venv/bin/uvicorn app.main:app --host 0.0.0.0 --port 8000 --workers 4
Restart=always

[Install]
WantedBy=multi-user.target
```

```bash
# Enable and start
sudo systemctl enable aiverse-api
sudo systemctl start aiverse-api
sudo systemctl status aiverse-api
```

### Option 3: Cloud Platforms

**AWS (Elastic Beanstalk):**

```yaml
# .ebextensions/python.config
option_settings:
  aws:elasticbeanstalk:application:environment:
    SECRET_KEY: your-secret-key
    DATABASE_URL: postgresql://...
```

**Google Cloud Run:**

```bash
# Build and deploy
gcloud builds submit --tag gcr.io/PROJECT_ID/aiverse-api
gcloud run deploy aiverse-api \
  --image gcr.io/PROJECT_ID/aiverse-api \
  --platform managed \
  --region us-central1 \
  --set-env-vars SECRET_KEY=xxx,DATABASE_URL=xxx
```

**Heroku:**

```bash
# Create app
heroku create aiverse-api

# Set environment variables
heroku config:set SECRET_KEY=xxx
heroku config:set DATABASE_URL=xxx

# Deploy
git push heroku main

# Run migrations
heroku run alembic upgrade head
```

## Post-Deployment

### 1. Health Checks

```bash
# API health
curl https://api.yourdomain.com/api/v1/health

# Database health
curl https://api.yourdomain.com/api/v1/health/database

# Authentication test
curl -X POST https://api.yourdomain.com/api/v1/auth/login \
  -H "Content-Type: application/json" \
  -d '{"username":"admin","password":"password"}'
```

### 2. Monitoring Setup

```python
# Add monitoring endpoints
from prometheus_client import make_asgi_app

metrics_app = make_asgi_app()
app.mount("/metrics", metrics_app)
```

### 3. Backup Strategy

```bash
# Database backup script
#!/bin/bash
pg_dump $DATABASE_URL | gzip > backup_$(date +%Y%m%d).sql.gz
aws s3 cp backup_$(date +%Y%m%d).sql.gz s3://backups/
```

### 4. Log Aggregation

```bash
# Send logs to CloudWatch/Datadog/etc.
# Configure in your logging setup
```

## Troubleshooting

### Token Issues

```bash
# Check token expiration
# Decode JWT at https://jwt.io

# Verify secret key is correct
echo $SECRET_KEY

# Check system time (JWT uses UTC)
date -u
```

### Database Connection

```bash
# Test connection
psql $DATABASE_URL

# Check pool status
# Add monitoring endpoint
```

### CORS Issues

```bash
# Verify ALLOWED_ORIGINS
echo $ALLOWED_ORIGINS

# Check preflight requests
curl -X OPTIONS https://api.yourdomain.com/api/v1/auth/login \
  -H "Origin: https://app.yourdomain.com" \
  -H "Access-Control-Request-Method: POST"
```

What’s the difference between access tokens and refresh tokens?

Access tokens are short-lived (15-30 minutes) and used for API requests, while refresh tokens are long-lived (7 days) and used only to obtain new access tokens. This dual-token system enhances security: if an access token is stolen, it expires quickly, and refresh tokens are only sent to the token refresh endpoint, reducing exposure. When an access token expires, the client uses the refresh token to get a new one without re-authenticating. This allows persistent login while maintaining security through token rotation.
Token blacklisting requires storing revoked tokens in a database or Redis. When a user logs out, add their token’s JTI (JWT ID claim) to the blacklist with expiration time. On every request, check if the token’s JTI is blacklisted before accepting it. For stateless JWT, this adds minimal overhead. Use Redis for performance: SETEX token_jti expiration_seconds 1. In your authentication dependency, query the blacklist: if await redis.exists(token_jti): raise Unauthorized. Clean up expired entries automatically with TTL.
Cookies are more secure when configured properly: use httpOnly (prevents JavaScript access), Secure (HTTPS only), and SameSite=Strict (CSRF protection). LocalStorage is vulnerable to XSS attacks—any JavaScript can access it. However, localStorage is simpler for SPAs and mobile apps. Best practice: Use httpOnly cookies for web apps, localStorage with proper XSS protection for SPAs, and secure storage APIs for mobile. Never store refresh tokens in localStorage—only in httpOnly cookies or secure mobile storage.
Generate time-limited tokens (1 hour) with email encoded in the JWT. Send token via email only—never return it in API responses except in development. Use the token once: mark as used in database or include a nonce. Verify token type matches “password_reset”. Rate limit reset requests (3 per hour per email). Don’t reveal if email exists—always return success. After reset, invalidate all user’s tokens and require re-login. Log all reset attempts for security monitoring.
Install authlib: pip install authlib. Configure OAuth2 client with provider credentials. Create callback endpoint to receive authorization code. Exchange code for access token from provider. Fetch user info from provider’s API. Create or update user in your database. Generate your own JWT tokens for the user. Example flow: User clicks “Login with Google” → Redirect to Google → User approves → Google redirects to callback → Exchange code for token → Create user → Return JWT. Use providers’ official SDKs for better security.

🎯 Complete Authentication Flow Diagram

┌─────────────┐
│   Client    │
└──────┬──────┘
       │
       │ 1. POST /auth/register
       │    {username, email, password}
       ▼
┌─────────────────────────────────────┐
│  FastAPI Backend                    │
│  ┌────────────────────────────┐    │
│  │ Create user                │    │
│  │ Hash password (bcrypt)     │    │
│  │ Store in PostgreSQL        │    │
│  │ Return user info           │    │
│  └────────────────────────────┘    │
└──────┬──────────────────────────────┘
       │
       │ 2. POST /auth/login
       │    {username, password}
       ▼
┌─────────────────────────────────────┐
│  Validate credentials               │
│  ┌────────────────────────────┐    │
│  │ Find user by username      │    │
│  │ Verify password hash       │    │
│  │ Generate access token      │    │
│  │ Generate refresh token     │    │
│  │ Return tokens              │    │
│  └────────────────────────────┘    │
└──────┬──────────────────────────────┘
       │
       │ {access_token, refresh_token}
       │
       │ 3. GET /users/me
       │    Authorization: Bearer <access_token>
       ▼
┌─────────────────────────────────────┐
│  Validate token                     │
│  ┌────────────────────────────┐    │
│  │ Decode JWT                 │    │
│  │ Verify signature           │    │
│  │ Check expiration           │    │
│  │ Get user from database     │    │
│  │ Return user info           │    │
│  └────────────────────────────┘    │
└─────────────────────────────────────┘

🎉 Summary

What You’ve Accomplished:

Complete JWT Authentication System

  • User registration and login
  • Access and refresh tokens
  • Token rotation for security
  • OAuth2 password flow

Password Management

  • Secure password hashing (bcrypt)
  • Password change functionality
  • Password reset via email tokens
  • Strong password validation

Email Verification

  • Email verification tokens
  • Verification workflow
  • Resend verification emails

Authorization

  • Role-based access control (RBAC)
  • Protected routes
  • Custom permission dependencies
  • User/admin role separation

Security Features

  • Token expiration
  • Token type validation
  • Rate limiting ready
  • Production security guidelines

Production Ready

  • Comprehensive error handling
  • Structured logging
  • Environment-based configuration
  • Deployment documentation

📊 Project Status

Episodes Completed:
├── ✅ Ep. 1-5: Core FastAPI Setup
├── ✅ Ep. 6: AI Integration (Ollama)
├── ✅ Ep. 7: React Frontend
├── ✅ Ep. 8: Database Integration
└── ✅ Ep. 9: JWT Authentication

Current Version: 0.5.0
Status: Production-Ready with Authentication

🔗 Quick Reference Commands

# Register user
curl -X POST http://localhost:8000/api/v1/auth/register \
  -H "Content-Type: application/json" \
  -d '{"username":"user","email":"user@example.com","password":"Pass123"}'

# Login
curl -X POST http://localhost:8000/api/v1/auth/login \
  -H "Content-Type: application/json" \
  -d '{"username":"user","password":"Pass123"}'

# Use protected endpoint
curl -X GET http://localhost:8000/api/v1/users/me \
  -H "Authorization: Bearer YOUR_ACCESS_TOKEN"

# Refresh token
curl -X POST http://localhost:8000/api/v1/auth/refresh \
  -H "Content-Type: application/json" \
  -d '{"refresh_token":"YOUR_REFRESH_TOKEN"}'

# Change password
curl -X POST http://localhost:8000/api/v1/auth/password/change \
  -H "Authorization: Bearer YOUR_ACCESS_TOKEN" \
  -H "Content-Type: application/json" \
  -d '{"current_password":"Pass123","new_password":"NewPass123"}'

🚀 What’s Next: Ep.10

Ready to deploy your complete application to production!

Ep.10 will cover:

  • Docker containerization
  • Docker Compose for local development
  • Kubernetes deployment
  • CI/CD with GitHub Actions
  • AWS/GCP/Azure deployment
  • Monitoring and logging
  • Performance optimization
  • Production best practices

📝 Files Created/Updated Summary

New Files:

  • app/core/security.py – JWT and password utilities
  • app/core/auth_dependencies.py – Authentication dependencies
  • app/models/auth.py – Authentication Pydantic models
  • app/api/v1/endpoints/auth.py – Authentication endpoints
  • test_authentication.py – Authentication test suite
  • SECURITY.md – Security best practices
  • AUTHENTICATION_DEPLOYMENT.md – Deployment guide

Updated Files:

  • app/core/config.py – Added JWT settings
  • app/services/user_service.py – Updated to use new security module
  • app/api/v1/endpoints/users.py – Added authentication protection
  • app/api/v1/api.py – Added auth router
  • app/main.py – Updated description
  • .env – Added JWT configuration
  • requirements.txt – Added python-jose, pwdlib

Congratulations! Your FastAPI application now has production-grade authentication! 🎉

Ready to proceed with Ep.10: Production Deployment?

Leave a Reply

Your email address will not be published. Required fields are marked *

Search