Views: 1
To implement JWT authentication in FastAPI, install python-jose for token generation, create utility functions to encode/decode JWT tokens with expiration, implement OAuth2PasswordBearer for token extraction, create a login endpoint that validates credentials and returns access tokens, and use dependency injection to protect routes by verifying tokens and extracting user information. Add refresh tokens for better security, implement role-based access control with custom dependencies, and store tokens securely on the client side. This architecture provides stateless, scalable authentication suitable for modern web applications and mobile apps.
🎓 What You’ll Learn
By the end of this tutorial, you’ll be able to:
- Implement JWT token generation and validation
- Create secure login and registration endpoints
- Protect routes with authentication dependencies
- Implement refresh token rotation
- Add role-based access control (RBAC)
- Handle token expiration and blacklisting
- Secure password reset flows
- Implement email verification
- Add OAuth2 password flow
- Deploy authentication in production
📖 Understanding JWT Authentication
What is JWT?
JWT (JSON Web Token) is a compact, URL-safe token format for securely transmitting information between parties.
Structure:
header.payload.signature
eyJhbGc.eyJzdWI.SflKxwRJ
Parts:
- Header: Algorithm and token type
- Payload: Claims (user data)
- Signature: Verification hash
Why JWT?
| Feature | JWT | Session Cookies |
|---|---|---|
| Stateless | ✅ Yes | ❌ No (needs DB) |
| Scalable | ✅ Easy | ⚠️ Requires sticky sessions |
| Mobile-Friendly | ✅ Perfect | ⚠️ Complex |
| Microservices | ✅ Ideal | ❌ Difficult |
| Revocation | ⚠️ Needs blacklist | ✅ Easy |
Authentication Flow
1. User → POST /login {username, password}
2. Server → Validates credentials
3. Server → Generates JWT token
4. Server → Returns {access_token, refresh_token}
5. Client → Stores tokens
6. Client → Sends Authorization: Bearer <token>
7. Server → Validates token
8. Server → Returns protected resource
🛠️ Step-by-Step Implementation
Step 1: Install Dependencies
# Activate virtual environment
cd fastapi
source venv/bin/activate # Windows: venv\Scripts\activate
# Install JWT and password hashing
pip install python-jose[cryptography]==3.3.0
pip install pwdlib==0.2.1
# Update requirements
pip freeze > requirements.txt
What we installed:
python-jose: JWT token generation/validationpwdlib: Modern password hashing (replaces passlib)
Step 2: Update Configuration
Update app/core/config.py:
"""
Core configuration module
Manages all application settings using Pydantic Settings
"""
from pydantic_settings import BaseSettings, SettingsConfigDict
from typing import List
from functools import lru_cache
class Settings(BaseSettings):
"""
Application settings
These values are loaded from environment variables or .env file
Pydantic validates the types automatically
"""
# Application
APP_NAME: str = "AIVerse Backend"
APP_VERSION: str = "0.5.0"
DEBUG: bool = False
ENVIRONMENT: str = "production"
# API
API_V1_PREFIX: str = "/api/v1"
HOST: str = "0.0.0.0"
PORT: int = 8000
# CORS
ALLOWED_ORIGINS: str = "http://localhost:3000"
@property
def allowed_origins_list(self) -> List[str]:
"""Convert comma-separated string to list"""
return [origin.strip() for origin in self.ALLOWED_ORIGINS.split(",")]
# Database Configuration
DATABASE_URL: str = "postgresql+asyncpg://aiverse_user:aiverse_pass@localhost:5432/aiverse_db"
DATABASE_ECHO: bool = False
DB_POOL_SIZE: int = 5
DB_MAX_OVERFLOW: int = 10
DB_POOL_TIMEOUT: int = 30
DB_POOL_RECYCLE: int = 3600
@property
def async_database_url(self) -> str:
"""Get async database URL"""
return self.DATABASE_URL
@property
def sync_database_url(self) -> str:
"""Get sync database URL (for Alembic migrations)"""
return self.DATABASE_URL.replace("+asyncpg", "").replace("postgresql+asyncpg", "postgresql")
# AI Settings
OLLAMA_BASE_URL: str = "http://localhost:11434"
DEFAULT_AI_MODEL: str = "llama2"
# JWT Authentication Settings
SECRET_KEY: str = "change-this-to-a-random-secret-key-in-production"
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
REFRESH_TOKEN_EXPIRE_DAYS: int = 7
# Password Reset Settings
PASSWORD_RESET_TOKEN_EXPIRE_HOURS: int = 1
EMAIL_VERIFICATION_TOKEN_EXPIRE_HOURS: int = 24
# Email Settings (for future use)
SMTP_HOST: str = "smtp.gmail.com"
SMTP_PORT: int = 587
SMTP_USER: str = ""
SMTP_PASSWORD: str = ""
EMAILS_FROM_EMAIL: str = "noreply@aiverse.com"
EMAILS_FROM_NAME: str = "AIVerse"
# Pydantic Settings Configuration
model_config = SettingsConfigDict(
env_file=".env",
env_file_encoding="utf-8",
case_sensitive=True,
extra="ignore"
)
@lru_cache()
def get_settings() -> Settings:
"""
Create settings instance (cached)
@lru_cache ensures we only create one Settings instance
and reuse it throughout the application
Returns:
Settings: Application settings
"""
return Settings()
# Convenience: Get settings instance
settings = get_settings()
Update .env:
# Application Configuration
APP_NAME=AIVerse Backend
APP_VERSION=0.5.0
DEBUG=True
ENVIRONMENT=development
# Server Configuration
API_V1_PREFIX=/api/v1
HOST=0.0.0.0
PORT=8000
# CORS Configuration
ALLOWED_ORIGINS=http://localhost:3000,http://localhost:8000
# Database Configuration
DATABASE_URL=postgresql+asyncpg://aiverse_user:aiverse_pass@localhost:5432/aiverse_db
DATABASE_ECHO=False
DB_POOL_SIZE=5
DB_MAX_OVERFLOW=10
# AI Configuration
OLLAMA_BASE_URL=http://localhost:11434
DEFAULT_AI_MODEL=llama2
# JWT Security Configuration
# Generate with: openssl rand -hex 32
SECRET_KEY=09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7
ALGORITHM=HS256
ACCESS_TOKEN_EXPIRE_MINUTES=30
REFRESH_TOKEN_EXPIRE_DAYS=7
# Password Reset
PASSWORD_RESET_TOKEN_EXPIRE_HOURS=1
EMAIL_VERIFICATION_TOKEN_EXPIRE_HOURS=24
# Email Configuration (optional)
SMTP_HOST=smtp.gmail.com
SMTP_PORT=587
SMTP_USER=
SMTP_PASSWORD=
EMAILS_FROM_EMAIL=noreply@aiverse.com
EMAILS_FROM_NAME=AIVerse
Step 3: Create Security Utilities
Create app/core/security.py:
"""
Security utilities
JWT token generation, password hashing, and authentication helpers
"""
from datetime import datetime, timedelta
from typing import Optional, Dict, Any
from jose import JWTError, jwt
from pwdlib import PasswordHash
from app.core.config import settings
# Password hashing using pwdlib (modern bcrypt)
password_hash = PasswordHash.recommended()
def hash_password(password: str) -> str:
"""
Hash password using bcrypt
Args:
password: Plain text password
Returns:
Hashed password
"""
return password_hash.hash(password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""
Verify password against hash
Args:
plain_password: Plain text password
hashed_password: Hashed password
Returns:
True if password matches, False otherwise
"""
return password_hash.verify(plain_password, hashed_password)
def create_access_token(
data: Dict[str, Any],
expires_delta: Optional[timedelta] = None
) -> str:
"""
Create JWT access token
Args:
data: Data to encode in token (usually {"sub": user_id})
expires_delta: Token expiration time
Returns:
Encoded JWT token
"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(
minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES
)
to_encode.update({
"exp": expire,
"iat": datetime.utcnow(),
"type": "access"
})
encoded_jwt = jwt.encode(
to_encode,
settings.SECRET_KEY,
algorithm=settings.ALGORITHM
)
return encoded_jwt
def create_refresh_token(
data: Dict[str, Any],
expires_delta: Optional[timedelta] = None
) -> str:
"""
Create JWT refresh token
Args:
data: Data to encode in token
expires_delta: Token expiration time
Returns:
Encoded JWT refresh token
"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(
days=settings.REFRESH_TOKEN_EXPIRE_DAYS
)
to_encode.update({
"exp": expire,
"iat": datetime.utcnow(),
"type": "refresh"
})
encoded_jwt = jwt.encode(
to_encode,
settings.SECRET_KEY,
algorithm=settings.ALGORITHM
)
return encoded_jwt
def decode_token(token: str) -> Optional[Dict[str, Any]]:
"""
Decode and validate JWT token
Args:
token: JWT token to decode
Returns:
Decoded token payload if valid, None otherwise
"""
try:
payload = jwt.decode(
token,
settings.SECRET_KEY,
algorithms=[settings.ALGORITHM]
)
return payload
except JWTError:
return None
def create_password_reset_token(email: str) -> str:
"""
Create password reset token
Args:
email: User email
Returns:
Password reset token
"""
delta = timedelta(hours=settings.PASSWORD_RESET_TOKEN_EXPIRE_HOURS)
return create_access_token(
data={"sub": email, "type": "password_reset"},
expires_delta=delta
)
def create_email_verification_token(email: str) -> str:
"""
Create email verification token
Args:
email: User email
Returns:
Email verification token
"""
delta = timedelta(hours=settings.EMAIL_VERIFICATION_TOKEN_EXPIRE_HOURS)
return create_access_token(
data={"sub": email, "type": "email_verification"},
expires_delta=delta
)
def verify_token_type(payload: Dict[str, Any], expected_type: str) -> bool:
"""
Verify token type
Args:
payload: Decoded token payload
expected_type: Expected token type
Returns:
True if token type matches, False otherwise
"""
return payload.get("type") == expected_type
Step 4: Create Authentication Models
Create app/models/auth.py:
"""
Authentication models
Pydantic models for authentication requests and responses
"""
from pydantic import BaseModel, EmailStr, Field
from typing import Optional
class Token(BaseModel):
"""
Token response model
Returned after successful authentication
"""
access_token: str = Field(..., description="JWT access token")
refresh_token: str = Field(..., description="JWT refresh token")
token_type: str = Field(default="bearer", description="Token type")
expires_in: int = Field(..., description="Token expiration in seconds")
model_config = {
"json_schema_extra": {
"example": {
"access_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
"refresh_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
"token_type": "bearer",
"expires_in": 1800
}
}
}
class TokenPayload(BaseModel):
"""
Token payload model
Represents decoded JWT token data
"""
sub: str | None = None # Subject (user ID or email)
exp: int | None = None # Expiration time
iat: int | None = None # Issued at
type: str | None = None # Token type
class LoginRequest(BaseModel):
"""
Login request model
"""
username: str = Field(..., min_length=3, description="Username or email")
password: str = Field(..., min_length=8, description="Password")
model_config = {
"json_schema_extra": {
"example": {
"username": "john_doe",
"password": "SecurePass123"
}
}
}
class RefreshTokenRequest(BaseModel):
"""
Refresh token request model
"""
refresh_token: str = Field(..., description="Refresh token")
model_config = {
"json_schema_extra": {
"example": {
"refresh_token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
}
}
}
class PasswordResetRequest(BaseModel):
"""
Password reset request model
"""
email: EmailStr = Field(..., description="User email")
model_config = {
"json_schema_extra": {
"example": {
"email": "user@example.com"
}
}
}
class PasswordResetConfirm(BaseModel):
"""
Password reset confirmation model
"""
token: str = Field(..., description="Reset token")
new_password: str = Field(..., min_length=8, description="New password")
model_config = {
"json_schema_extra": {
"example": {
"token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...",
"new_password": "NewSecurePass123"
}
}
}
class EmailVerificationRequest(BaseModel):
"""
Email verification request model
"""
token: str = Field(..., description="Verification token")
model_config = {
"json_schema_extra": {
"example": {
"token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
}
}
}
class ChangePasswordRequest(BaseModel):
"""
Change password request model
"""
current_password: str = Field(..., description="Current password")
new_password: str = Field(..., min_length=8, description="New password")
model_config = {
"json_schema_extra": {
"example": {
"current_password": "OldPass123",
"new_password": "NewSecurePass123"
}
}
}
Step 5: Create Authentication Dependencies
Create app/core/auth_dependencies.py:
"""
Authentication dependencies
FastAPI dependencies for protecting routes
"""
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.ext.asyncio import AsyncSession
from typing import Annotated, Optional
from app.core.security import decode_token, verify_token_type
from app.db.session import get_db
from app.db.repositories.user_repository import UserRepository
from app.db.models.user import User, UserRole
from app.models.auth import TokenPayload
# OAuth2 scheme for token extraction
oauth2_scheme = OAuth2PasswordBearer(tokenUrl=f"/api/v1/auth/login")
http_bearer = HTTPBearer()
async def get_current_user(
token: Annotated[str, Depends(oauth2_scheme)],
db: AsyncSession = Depends(get_db)
) -> User:
"""
Get current authenticated user from token
Args:
token: JWT access token
db: Database session
Returns:
Current user
Raises:
HTTPException: If token is invalid or user not found
"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
# Decode token
payload = decode_token(token)
if payload is None:
raise credentials_exception
# Verify token type
if not verify_token_type(payload, "access"):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token type",
headers={"WWW-Authenticate": "Bearer"},
)
# Extract user ID
user_id: str = payload.get("sub")
if user_id is None:
raise credentials_exception
# Get user from database
user_repo = UserRepository(db)
try:
user = await user_repo.get_by_id(int(user_id))
except ValueError:
# If user_id is email instead of ID
user = await user_repo.get_by_email(user_id)
if user is None:
raise credentials_exception
# Check if user is active
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Inactive user"
)
return user
async def get_current_active_user(
current_user: Annotated[User, Depends(get_current_user)]
) -> User:
"""
Get current active user
Args:
current_user: Current user from token
Returns:
Current user if active
Raises:
HTTPException: If user is inactive
"""
if not current_user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Inactive user"
)
return current_user
async def get_current_verified_user(
current_user: Annotated[User, Depends(get_current_active_user)]
) -> User:
"""
Get current verified user
Args:
current_user: Current active user
Returns:
Current user if verified
Raises:
HTTPException: If user email is not verified
"""
if not current_user.is_verified:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Email not verified"
)
return current_user
class RoleChecker:
"""
Role-based access control dependency
Usage:
@router.get("/admin")
async def admin_only(user: User = Depends(RoleChecker([UserRole.ADMIN]))):
return {"message": "Admin access granted"}
"""
def __init__(self, allowed_roles: list[UserRole]):
self.allowed_roles = allowed_roles
async def __call__(
self,
current_user: Annotated[User, Depends(get_current_active_user)]
) -> User:
"""
Check if user has required role
Args:
current_user: Current authenticated user
Returns:
Current user if role is allowed
Raises:
HTTPException: If user doesn't have required role
"""
if current_user.role not in self.allowed_roles:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Permission denied. Required roles: {[r.value for r in self.allowed_roles]}"
)
return current_user
# Convenience role checkers
require_admin = RoleChecker([UserRole.ADMIN])
require_user = RoleChecker([UserRole.USER, UserRole.ADMIN])
Step 6: Update User Service
Update app/services/user_service.py to use the new security module:
"""
User service - Business logic for user operations
Updated with new security module
"""
from typing import Optional
from sqlalchemy.ext.asyncio import AsyncSession
from app.db.repositories.user_repository import UserRepository
from app.db.models.user import User, UserRole
from app.models.user import UserCreate, UserUpdate
from app.core.exceptions import UserNotFoundException, UserAlreadyExistsException
from app.core.security import hash_password, verify_password # Updated import
class UserService:
"""
User service class
Handles all user-related business logic with database operations
"""
def __init__(self, db: AsyncSession):
"""
Initialize service with database session
Args:
db: Database session
"""
self.repository = UserRepository(db)
async def create_user(self, user_data: UserCreate) -> User:
"""
Create a new user
Args:
user_data: User creation data
Returns:
Created user
Raises:
UserAlreadyExistsException: If username or email already exists
"""
# Check for duplicate username
existing_user = await self.repository.get_by_username(user_data.username)
if existing_user:
raise UserAlreadyExistsException("username", user_data.username)
# Check for duplicate email
existing_email = await self.repository.get_by_email(user_data.email)
if existing_email:
raise UserAlreadyExistsException("email", user_data.email)
# Hash password using new security module
hashed_password = hash_password(user_data.password)
# Create user
user = await self.repository.create(
username=user_data.username,
email=user_data.email,
hashed_password=hashed_password,
full_name=user_data.full_name,
role=user_data.role,
is_active=True,
is_verified=False
)
return user
async def get_all_users(
self,
skip: int = 0,
limit: int = 10,
role: Optional[UserRole] = None
) -> list[User]:
"""Get all users with optional filtering"""
if role:
return await self.repository.get_by_role(role, skip, limit)
return await self.repository.get_all(skip, limit)
async def get_user_by_id(self, user_id: int) -> User:
"""Get user by ID"""
user = await self.repository.get_by_id(user_id)
if not user:
raise UserNotFoundException(user_id)
return user
async def get_user_by_username(self, username: str) -> Optional[User]:
"""Get user by username"""
return await self.repository.get_by_username(username)
async def get_user_by_email(self, email: str) -> Optional[User]:
"""Get user by email"""
return await self.repository.get_by_email(email)
async def update_user(self, user_id: int, user_update: UserUpdate) -> User:
"""Update user information"""
user = await self.get_user_by_id(user_id)
update_data = user_update.model_dump(exclude_unset=True)
# Check for username conflict
if "username" in update_data:
existing = await self.repository.get_by_username(update_data["username"])
if existing and existing.id != user_id:
raise UserAlreadyExistsException("username", update_data["username"])
# Check for email conflict
if "email" in update_data:
existing = await self.repository.get_by_email(update_data["email"])
if existing and existing.id != user_id:
raise UserAlreadyExistsException("email", update_data["email"])
updated_user = await self.repository.update(user_id, **update_data)
if not updated_user:
raise UserNotFoundException(user_id)
return updated_user
async def delete_user(self, user_id: int) -> None:
"""Delete a user"""
deleted = await self.repository.delete(user_id)
if not deleted:
raise UserNotFoundException(user_id)
async def get_user_count(self) -> int:
"""Get total number of users"""
return await self.repository.count()
async def authenticate_user(
self,
username: str,
password: str
) -> Optional[User]:
"""
Authenticate user with username and password
Args:
username: Username or email
password: Plain text password
Returns:
User if authentication successful, None otherwise
"""
# Try username first
user = await self.repository.get_by_username(username)
# If not found, try email
if not user:
user = await self.repository.get_by_email(username)
if not user:
return None
# Verify password using new security module
if not verify_password(password, user.hashed_password):
return None
# Update last login
await self.repository.update_last_login(user.id)
return user
async def change_password(
self,
user_id: int,
current_password: str,
new_password: str
) -> bool:
"""
Change user password
Args:
user_id: User ID
current_password: Current password
new_password: New password
Returns:
True if password changed successfully
Raises:
UserNotFoundException: If user not found
ValueError: If current password is incorrect
"""
user = await self.get_user_by_id(user_id)
# Verify current password
if not verify_password(current_password, user.hashed_password):
raise ValueError("Current password is incorrect")
# Hash new password
hashed_password = hash_password(new_password)
# Update password
await self.repository.update(user_id, hashed_password=hashed_password)
return True
async def reset_password(
self,
email: str,
new_password: str
) -> bool:
"""
Reset user password (for password reset flow)
Args:
email: User email
new_password: New password
Returns:
True if password reset successfully
Raises:
UserNotFoundException: If user not found
"""
user = await self.repository.get_by_email(email)
if not user:
raise UserNotFoundException(email)
# Hash new password
hashed_password = hash_password(new_password)
# Update password
await self.repository.update(user.id, hashed_password=hashed_password)
return True
async def verify_email(self, email: str) -> bool:
"""
Mark user email as verified
Args:
email: User email
Returns:
True if email verified successfully
Raises:
UserNotFoundException: If user not found
"""
user = await self.repository.get_by_email(email)
if not user:
raise UserNotFoundException(email)
await self.repository.update(user.id, is_verified=True)
return True
Step 7: Create Authentication Endpoints
Create app/api/v1/endpoints/auth.py:
"""
Authentication endpoints
Handles login, registration, token refresh, and password management
"""
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.ext.asyncio import AsyncSession
from typing import Annotated
from datetime import timedelta
from app.db.session import get_db
from app.models.auth import (
Token,
LoginRequest,
RefreshTokenRequest,
PasswordResetRequest,
PasswordResetConfirm,
EmailVerificationRequest,
ChangePasswordRequest
)
from app.models.user import UserCreate, UserResponse
from app.models.common import MessageResponse
from app.services.user_service import UserService
from app.core.security import (
create_access_token,
create_refresh_token,
decode_token,
verify_token_type,
create_password_reset_token,
create_email_verification_token
)
from app.core.auth_dependencies import get_current_user, get_current_active_user
from app.core.config import settings
from app.db.models.user import User
from app.utils.logger import logger
router = APIRouter(prefix="/auth", tags=["Authentication"])
def get_user_service(db: AsyncSession = Depends(get_db)) -> UserService:
"""Get user service with database session"""
return UserService(db)
# ============================================
# REGISTRATION
# ============================================
@router.post(
"/register",
response_model=UserResponse,
status_code=status.HTTP_201_CREATED,
summary="Register new user"
)
async def register(
user_data: UserCreate,
service: Annotated[UserService, Depends(get_user_service)]
):
"""
Register a new user
Creates a new user account with the provided information.
Email verification is required before full access.
- **username**: Unique username (3-50 characters)
- **email**: Valid email address
- **password**: Strong password (min 8 characters)
- **full_name**: Optional full name
- **role**: User role (defaults to 'user')
"""
user = await service.create_user(user_data)
logger.info(
"User registered",
extra={"user_id": user.id, "username": user.username, "email": user.email}
)
return UserResponse.model_validate(user)
# ============================================
# LOGIN
# ============================================
@router.post(
"/login",
response_model=Token,
summary="Login user"
)
async def login(
credentials: LoginRequest,
service: Annotated[UserService, Depends(get_user_service)]
):
"""
Login with username/email and password
Returns access and refresh tokens for authentication.
**Request Body:**
- **username**: Username or email
- **password**: User password
**Response:**
- **access_token**: JWT access token (expires in 30 minutes)
- **refresh_token**: JWT refresh token (expires in 7 days)
- **token_type**: Bearer
- **expires_in**: Token expiration in seconds
"""
# Authenticate user
user = await service.authenticate_user(
credentials.username,
credentials.password
)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
# Create tokens
access_token = create_access_token(data={"sub": str(user.id)})
refresh_token = create_refresh_token(data={"sub": str(user.id)})
logger.info(
"User logged in",
extra={"user_id": user.id, "username": user.username}
)
return Token(
access_token=access_token,
refresh_token=refresh_token,
token_type="bearer",
expires_in=settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60
)
@router.post(
"/login/form",
response_model=Token,
summary="Login user (OAuth2 form)"
)
async def login_form(
form_data: Annotated[OAuth2PasswordRequestForm, Depends()],
service: Annotated[UserService, Depends(get_user_service)]
):
"""
OAuth2 compatible login endpoint
This endpoint follows OAuth2 password flow specification.
Used by Swagger UI and OAuth2 clients.
**Form Data:**
- **username**: Username or email
- **password**: User password
"""
# Authenticate user
user = await service.authenticate_user(
form_data.username,
form_data.password
)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
# Create tokens
access_token = create_access_token(data={"sub": str(user.id)})
refresh_token = create_refresh_token(data={"sub": str(user.id)})
logger.info(
"User logged in (OAuth2)",
extra={"user_id": user.id, "username": user.username}
)
return Token(
access_token=access_token,
refresh_token=refresh_token,
token_type="bearer",
expires_in=settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60
)
# ============================================
# TOKEN REFRESH
# ============================================
@router.post(
"/refresh",
response_model=Token,
summary="Refresh access token"
)
async def refresh_token(
refresh_request: RefreshTokenRequest,
service: Annotated[UserService, Depends(get_user_service)]
):
"""
Refresh access token using refresh token
When the access token expires, use the refresh token
to obtain a new access token without re-authenticating.
**Request Body:**
- **refresh_token**: Valid refresh token
**Response:**
- New access and refresh tokens
"""
# Decode refresh token
payload = decode_token(refresh_request.refresh_token)
if payload is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid refresh token",
headers={"WWW-Authenticate": "Bearer"},
)
# Verify token type
if not verify_token_type(payload, "refresh"):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token type",
headers={"WWW-Authenticate": "Bearer"},
)
# Get user ID
user_id = payload.get("sub")
if user_id is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token payload",
headers={"WWW-Authenticate": "Bearer"},
)
# Verify user exists and is active
try:
user = await service.get_user_by_id(int(user_id))
except:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found",
headers={"WWW-Authenticate": "Bearer"},
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Inactive user"
)
# Create new tokens (token rotation)
new_access_token = create_access_token(data={"sub": str(user.id)})
new_refresh_token = create_refresh_token(data={"sub": str(user.id)})
logger.info(
"Token refreshed",
extra={"user_id": user.id}
)
return Token(
access_token=new_access_token,
refresh_token=new_refresh_token,
token_type="bearer",
expires_in=settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60
)
# ============================================
# USER INFO
# ============================================
@router.get(
"/me",
response_model=UserResponse,
summary="Get current user"
)
async def get_current_user_info(
current_user: Annotated[User, Depends(get_current_user)]
):
"""
Get current authenticated user information
Returns the profile of the currently authenticated user.
Requires valid access token.
"""
return UserResponse.model_validate(current_user)
# ============================================
# PASSWORD MANAGEMENT
# ============================================
@router.post(
"/password/change",
response_model=MessageResponse,
summary="Change password"
)
async def change_password(
password_data: ChangePasswordRequest,
current_user: Annotated[User, Depends(get_current_user)],
service: Annotated[UserService, Depends(get_user_service)]
):
"""
Change user password
Allows authenticated users to change their password.
Requires current password for verification.
**Request Body:**
- **current_password**: Current password
- **new_password**: New password (min 8 characters)
"""
try:
await service.change_password(
current_user.id,
password_data.current_password,
password_data.new_password
)
logger.info(
"Password changed",
extra={"user_id": current_user.id}
)
return MessageResponse(
message="Password changed successfully",
success=True
)
except ValueError as e:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e)
)
@router.post(
"/password/reset/request",
response_model=MessageResponse,
summary="Request password reset"
)
async def request_password_reset(
reset_request: PasswordResetRequest,
service: Annotated[UserService, Depends(get_user_service)]
):
"""
Request password reset email
Sends a password reset token to the user's email.
Token expires in 1 hour.
**Request Body:**
- **email**: User email address
**Note:** Always returns success to prevent email enumeration
"""
# Get user by email
user = await service.get_user_by_email(reset_request.email)
if user:
# Create password reset token
reset_token = create_password_reset_token(user.email)
# TODO: Send email with reset token
# For now, we'll log it (in production, send email)
logger.info(
"Password reset requested",
extra={
"user_id": user.id,
"email": user.email,
"reset_token": reset_token # Remove in production!
}
)
# In development, return token in response
# In production, only send via email
if settings.ENVIRONMENT == "development":
return MessageResponse(
message=f"Password reset token (DEV ONLY): {reset_token}",
success=True
)
# Always return success to prevent email enumeration
return MessageResponse(
message="If the email exists, a password reset link has been sent",
success=True
)
@router.post(
"/password/reset/confirm",
response_model=MessageResponse,
summary="Confirm password reset"
)
async def confirm_password_reset(
reset_data: PasswordResetConfirm,
service: Annotated[UserService, Depends(get_user_service)]
):
"""
Reset password with token
Uses the password reset token to set a new password.
**Request Body:**
- **token**: Password reset token (from email)
- **new_password**: New password (min 8 characters)
"""
# Decode token
payload = decode_token(reset_data.token)
if payload is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid or expired token"
)
# Verify token type
if not verify_token_type(payload, "password_reset"):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid token type"
)
# Get email from token
email = payload.get("sub")
if email is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid token payload"
)
# Reset password
try:
await service.reset_password(email, reset_data.new_password)
logger.info(
"Password reset completed",
extra={"email": email}
)
return MessageResponse(
message="Password reset successful",
success=True
)
except Exception as e:
logger.error(f"Password reset failed: {e}")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Password reset failed"
)
# ============================================
# EMAIL VERIFICATION
# ============================================
@router.post(
"/email/verification/request",
response_model=MessageResponse,
summary="Request email verification"
)
async def request_email_verification(
current_user: Annotated[User, Depends(get_current_user)],
):
"""
Request email verification
Sends a verification email to the user's email address.
Token expires in 24 hours.
**Note:** User must be authenticated
"""
if current_user.is_verified:
return MessageResponse(
message="Email already verified",
success=True
)
# Create verification token
verification_token = create_email_verification_token(current_user.email)
# TODO: Send email with verification token
# For now, we'll log it (in production, send email)
logger.info(
"Email verification requested",
extra={
"user_id": current_user.id,
"email": current_user.email,
"verification_token": verification_token # Remove in production!
}
)
# In development, return token in response
if settings.ENVIRONMENT == "development":
return MessageResponse(
message=f"Verification token (DEV ONLY): {verification_token}",
success=True
)
return MessageResponse(
message="Verification email sent",
success=True
)
@router.post(
"/email/verification/confirm",
response_model=MessageResponse,
summary="Confirm email verification"
)
async def confirm_email_verification(
verification_data: EmailVerificationRequest,
service: Annotated[UserService, Depends(get_user_service)]
):
"""
Verify email with token
Uses the email verification token to mark email as verified.
**Request Body:**
- **token**: Email verification token (from email)
"""
# Decode token
payload = decode_token(verification_data.token)
if payload is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid or expired token"
)
# Verify token type
if not verify_token_type(payload, "email_verification"):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid token type"
)
# Get email from token
email = payload.get("sub")
if email is None:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid token payload"
)
# Verify email
try:
await service.verify_email(email)
logger.info(
"Email verified",
extra={"email": email}
)
return MessageResponse(
message="Email verified successfully",
success=True
)
except Exception as e:
logger.error(f"Email verification failed: {e}")
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Email verification failed"
)
# ============================================
# LOGOUT
# ============================================
@router.post(
"/logout",
response_model=MessageResponse,
summary="Logout user"
)
async def logout(
current_user: Annotated[User, Depends(get_current_user)]
):
"""
Logout current user
Since JWT tokens are stateless, logout is handled client-side
by removing tokens from storage.
For enhanced security, implement token blacklisting.
**Note:** Client should delete stored tokens after this call
"""
logger.info(
"User logged out",
extra={"user_id": current_user.id}
)
return MessageResponse(
message="Logged out successfully. Please delete tokens from client storage.",
success=True
)
Step 8: Update API Router
Update app/api/v1/api.py:
"""
API v1 router aggregator
Combines all v1 endpoint routers
"""
from fastapi import APIRouter
from app.api.v1.endpoints import (
users,
health,
users_advanced,
dependencies_demo,
conversations,
auth # New!
)
# Create main v1 router
api_router = APIRouter()
# Include all endpoint routers
api_router.include_router(health.router)
api_router.include_router(auth.router) # Auth first for Swagger UI
api_router.include_router(users.router)
api_router.include_router(users_advanced.router)
api_router.include_router(dependencies_demo.router)
api_router.include_router(conversations.router)
Step 9: Protect Existing Endpoints
Update app/api/v1/endpoints/users.py to protect routes:
"""
User management endpoints
Updated with authentication protection
"""
from fastapi import APIRouter, Depends, status
from sqlalchemy.ext.asyncio import AsyncSession
from typing import Annotated
from app.models.user import User, UserCreate, UserUpdate, UserResponse, UserRole
from app.services.user_service import UserService
from app.db.session import get_db
from app.core.auth_dependencies import (
get_current_user,
get_current_active_user,
require_admin,
RoleChecker
)
from app.db.models.user import User as DBUser
router = APIRouter(prefix="/users", tags=["Users"])
def get_user_service(db: AsyncSession = Depends(get_db)) -> UserService:
"""Dependency to get user service with database session"""
return UserService(db)
@router.post(
"",
response_model=UserResponse,
status_code=status.HTTP_201_CREATED,
summary="Create a new user",
dependencies=[Depends(require_admin)] # Only admins can create users
)
async def create_user(
user_data: UserCreate,
service: Annotated[UserService, Depends(get_user_service)],
current_user: Annotated[DBUser, Depends(require_admin)] # Verify admin
):
"""
Create a new user (Admin only)
Only users with admin role can create new users.
- **username**: Unique username (3-50 characters)
- **email**: Valid email address
- **full_name**: Optional full name
- **password**: Password (min 8 characters)
- **role**: User role (admin, user, guest)
"""
user = await service.create_user(user_data)
return UserResponse.model_validate(user)
@router.get(
"",
response_model=list[UserResponse],
summary="Get all users",
dependencies=[Depends(get_current_active_user)] # Requires authentication
)
async def get_users(
skip: int = 0,
limit: int = 10,
role: UserRole | None = None,
service: Annotated[UserService, Depends(get_user_service)] = None,
current_user: Annotated[DBUser, Depends(get_current_active_user)] = None
):
"""
Retrieve users with optional filtering (Authenticated users only)
- **skip**: Number of records to skip (for pagination)
- **limit**: Maximum number of records to return
- **role**: Filter by user role
"""
users = await service.get_all_users(skip=skip, limit=limit, role=role)
return [UserResponse.model_validate(user) for user in users]
@router.get(
"/me",
response_model=UserResponse,
summary="Get current user profile"
)
async def get_my_profile(
current_user: Annotated[DBUser, Depends(get_current_active_user)]
):
"""
Get current user's profile
Returns the authenticated user's profile information.
"""
return UserResponse.model_validate(current_user)
@router.get(
"/{user_id}",
response_model=UserResponse,
summary="Get user by ID",
dependencies=[Depends(get_current_active_user)]
)
async def get_user(
user_id: int,
service: Annotated[UserService, Depends(get_user_service)],
current_user: Annotated[DBUser, Depends(get_current_active_user)]
):
"""
Get a specific user by their ID (Authenticated users only)
"""
user = await service.get_user_by_id(user_id)
return UserResponse.model_validate(user)
@router.patch(
"/{user_id}",
response_model=UserResponse,
summary="Update user"
)
async def update_user(
user_id: int,
user_update: UserUpdate,
service: Annotated[UserService, Depends(get_user_service)],
current_user: Annotated[DBUser, Depends(get_current_active_user)]
):
"""
Update user information
Users can only update their own profile unless they are admin.
Only provided fields will be updated.
"""
# Users can only update themselves unless admin
if current_user.id != user_id and current_user.role != UserRole.ADMIN:
from fastapi import HTTPException
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Not authorized to update this user"
)
user = await service.update_user(user_id, user_update)
return UserResponse.model_validate(user)
@router.delete(
"/{user_id}",
status_code=status.HTTP_204_NO_CONTENT,
summary="Delete user",
dependencies=[Depends(require_admin)] # Only admins can delete
)
async def delete_user(
user_id: int,
service: Annotated[UserService, Depends(get_user_service)],
current_user: Annotated[DBUser, Depends(require_admin)]
):
"""
Delete a user by ID (Admin only)
Only administrators can delete users.
"""
await service.delete_user(user_id)
return None
@router.get(
"/stats/count",
summary="Get user statistics",
dependencies=[Depends(require_admin)] # Only admins can see stats
)
async def get_user_stats(
service: Annotated[UserService, Depends(get_user_service)],
current_user: Annotated[DBUser, Depends(require_admin)]
):
"""
Get user statistics (Admin only)
Returns total user count and other statistics.
"""
total = await service.get_user_count()
return {"total_users": total}
Step 10: Update Main Application
Update app/main.py to show authentication in docs:
"""
FastAPI AI Backend - Main Application
Updated with JWT authentication
"""
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.exceptions import RequestValidationError
from pydantic import ValidationError
from contextlib import asynccontextmanager
from app.core.config import settings
from app.api.v1.api import api_router
from app.core.exceptions import AppException
from app.core.error_handlers import (
app_exception_handler,
validation_exception_handler,
generic_exception_handler,
)
from app.middleware.logging_middleware import LoggingMiddleware
from app.middleware.performance_middleware import PerformanceMiddleware
from app.utils.logger import logger
from app.db.session import close_db
from app.db.utils import check_database_connection, get_database_info
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan events"""
# Startup
logger.info(
"Application startup",
extra={
"app_name": settings.APP_NAME,
"version": settings.APP_VERSION,
"environment": settings.ENVIRONMENT,
},
)
print(f"🚀 Starting {settings.APP_NAME} v{settings.APP_VERSION}")
print(f"📝 Environment: {settings.ENVIRONMENT}")
print(f"🔧 Debug mode: {settings.DEBUG}")
# Check database connection
db_connected = await check_database_connection()
if db_connected:
print("✅ Database connection successful")
db_info = await get_database_info()
print(f"📊 PostgreSQL version: {db_info.get('version', 'unknown')}")
else:
print("❌ Database connection failed!")
print(f"🔐 JWT Authentication enabled")
print(f"📚 API Docs: http://{settings.HOST}:{settings.PORT}/docs")
print(f"🔑 Use /api/v1/auth/login to authenticate")
yield
# Shutdown
logger.info("Application shutdown", extra={"app_name": settings.APP_NAME})
print(f"👋 Shutting down {settings.APP_NAME}")
await close_db()
def create_application() -> FastAPI:
"""Application factory pattern"""
app = FastAPI(
title=settings.APP_NAME,
version=settings.APP_VERSION,
description="""
A production-ready FastAPI backend for AI applications.
## Features
* **JWT Authentication** with access and refresh tokens
* **Role-Based Access Control** (RBAC)
* **Password Management** (change, reset, verification)
* **Email Verification** workflow
* **OAuth2 Password Flow** for Swagger UI
* **Database Integration** with PostgreSQL
* **Async Operations** for high performance
* **Repository Pattern** for clean data access
* **Advanced Validation** with Pydantic
* **Comprehensive Error Handling**
* **Structured Logging**
## Authentication
1. **Register**: POST /api/v1/auth/register
2. **Login**: POST /api/v1/auth/login
3. **Use Token**: Add `Authorization: Bearer <token>` header
4. **Refresh**: POST /api/v1/auth/refresh when token expires
## Quick Start
```bash
# Register a user
curl -X POST "http://localhost:8000/api/v1/auth/register" \\
-H "Content-Type: application/json" \\
-d '{"username":"user","email":"user@example.com","password":"password123"}'
# Login
curl -X POST "http://localhost:8000/api/v1/auth/login" \\
-H "Content-Type: application/json" \\
-d '{"username":"user","password":"password123"}'
# Use the access_token in subsequent requests
curl -X GET "http://localhost:8000/api/v1/users/me" \\
-H "Authorization: Bearer <your_access_token>"
```
## Security
* Passwords hashed with bcrypt
* JWT tokens with expiration
* Refresh token rotation
* Role-based permissions
* Password reset via email tokens
* Email verification
""",
debug=settings.DEBUG,
docs_url="/docs",
redoc_url="/redoc",
openapi_url="/openapi.json",
lifespan=lifespan,
)
# Middleware
app.add_middleware(PerformanceMiddleware, slow_request_threshold=1.0)
app.add_middleware(LoggingMiddleware)
app.add_middleware(
CORSMiddleware,
allow_origins=settings.allowed_origins_list,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
expose_headers=["X-Request-ID", "X-Process-Time"],
)
# Exception Handlers
app.add_exception_handler(AppException, app_exception_handler)
app.add_exception_handler(RequestValidationError, validation_exception_handler)
app.add_exception_handler(ValidationError, validation_exception_handler)
app.add_exception_handler(Exception, generic_exception_handler)
# Routers
app.include_router(api_router, prefix=settings.API_V1_PREFIX)
return app
app = create_application()
Step 11: Create Authentication Testing Script
Create test_authentication.py:
"""
Test script for authentication endpoints
Tests all JWT authentication functionality
"""
import requests
import json
from typing import Dict, Any
BASE_URL = "http://127.0.0.1:8000/api/v1"
def print_test(title: str, response: requests.Response):
"""Print test results"""
print(f"\n{'='*70}")
print(f"{title}")
print(f"{'='*70}")
print(f"Status: {response.status_code}")
try:
data = response.json()
print(f"Response:\n{json.dumps(data, indent=2, default=str)}")
except:
print(f"Response: {response.text[:500]}")
def test_register():
"""Test user registration"""
user_data = {
"username": "auth_test_user",
"email": "authtest@example.com",
"password": "SecurePass123",
"full_name": "Auth Test User"
}
response = requests.post(f"{BASE_URL}/auth/register", json=user_data)
print_test("✅ REGISTER USER", response)
if response.status_code == 201:
return response.json()
return None
def test_login(username: str, password: str):
"""Test user login"""
credentials = {
"username": username,
"password": password
}
response = requests.post(f"{BASE_URL}/auth/login", json=credentials)
print_test("🔐 LOGIN", response)
if response.status_code == 200:
return response.json()
return None
def test_login_invalid():
"""Test login with invalid credentials"""
credentials = {
"username": "wrong_user",
"password": "wrong_password"
}
response = requests.post(f"{BASE_URL}/auth/login", json=credentials)
print_test("❌ LOGIN (INVALID CREDENTIALS)", response)
def test_get_current_user(access_token: str):
"""Test getting current user info"""
headers = {
"Authorization": f"Bearer {access_token}"
}
response = requests.get(f"{BASE_URL}/auth/me", headers=headers)
print_test("👤 GET CURRENT USER", response)
def test_protected_route_without_token():
"""Test accessing protected route without token"""
response = requests.get(f"{BASE_URL}/users")
print_test("🔒 PROTECTED ROUTE (NO TOKEN)", response)
def test_protected_route_with_token(access_token: str):
"""Test accessing protected route with valid token"""
headers = {
"Authorization": f"Bearer {access_token}"
}
response = requests.get(f"{BASE_URL}/users", headers=headers)
print_test("🔓 PROTECTED ROUTE (WITH TOKEN)", response)
def test_refresh_token(refresh_token: str):
"""Test token refresh"""
refresh_data = {
"refresh_token": refresh_token
}
response = requests.post(f"{BASE_URL}/auth/refresh", json=refresh_data)
print_test("🔄 REFRESH TOKEN", response)
if response.status_code == 200:
return response.json()
return None
def test_change_password(access_token: str):
"""Test password change"""
headers = {
"Authorization": f"Bearer {access_token}"
}
password_data = {
"current_password": "SecurePass123",
"new_password": "NewSecurePass123"
}
response = requests.post(
f"{BASE_URL}/auth/password/change",
json=password_data,
headers=headers
)
print_test("🔑 CHANGE PASSWORD", response)
def test_password_reset_request():
"""Test password reset request"""
reset_data = {
"email": "authtest@example.com"
}
response = requests.post(
f"{BASE_URL}/auth/password/reset/request",
json=reset_data
)
print_test("📧 PASSWORD RESET REQUEST", response)
# Extract token from development response
if response.status_code == 200:
data = response.json()
message = data.get("message", "")
if "DEV ONLY" in message:
# Extract token from message
token = message.split(": ")[1] if ": " in message else None
return token
return None
def test_password_reset_confirm(reset_token: str):
"""Test password reset confirmation"""
if not reset_token:
print("\n⚠️ No reset token available, skipping password reset confirm")
return
reset_data = {
"token": reset_token,
"new_password": "ResetPass123"
}
response = requests.post(
f"{BASE_URL}/auth/password/reset/confirm",
json=reset_data
)
print_test("✅ PASSWORD RESET CONFIRM", response)
def test_email_verification_request(access_token: str):
"""Test email verification request"""
headers = {
"Authorization": f"Bearer {access_token}"
}
response = requests.post(
f"{BASE_URL}/auth/email/verification/request",
headers=headers
)
print_test("📧 EMAIL VERIFICATION REQUEST", response)
# Extract token from development response
if response.status_code == 200:
data = response.json()
message = data.get("message", "")
if "DEV ONLY" in message:
token = message.split(": ")[1] if ": " in message else None
return token
return None
def test_email_verification_confirm(verification_token: str):
"""Test email verification confirmation"""
if not verification_token:
print("\n⚠️ No verification token available, skipping")
return
verification_data = {
"token": verification_token
}
response = requests.post(
f"{BASE_URL}/auth/email/verification/confirm",
json=verification_data
)
print_test("✅ EMAIL VERIFICATION CONFIRM", response)
def test_role_based_access(access_token: str):
"""Test role-based access control"""
headers = {
"Authorization": f"Bearer {access_token}"
}
# Try to access admin-only endpoint (should fail for regular user)
user_data = {
"username": "new_user",
"email": "newuser@example.com",
"password": "Password123"
}
response = requests.post(
f"{BASE_URL}/users",
json=user_data,
headers=headers
)
print_test("🚫 ADMIN ENDPOINT (REGULAR USER)", response)
def test_logout(access_token: str):
"""Test logout"""
headers = {
"Authorization": f"Bearer {access_token}"
}
response = requests.post(f"{BASE_URL}/auth/logout", headers=headers)
print_test("👋 LOGOUT", response)
def test_expired_token():
"""Test using expired token"""
# Use a token that's clearly expired (or malformed)
expired_token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxIiwiZXhwIjoxNjAwMDAwMDAwfQ.invalid"
headers = {
"Authorization": f"Bearer {expired_token}"
}
response = requests.get(f"{BASE_URL}/auth/me", headers=headers)
print_test("⏰ EXPIRED/INVALID TOKEN", response)
def run_all_tests():
"""Run complete authentication test suite"""
print("\n" + "🧪"*35)
print("AUTHENTICATION TEST SUITE")
print("🧪"*35)
# Registration
print("\n" + "="*70)
print("REGISTRATION")
print("="*70)
user = test_register()
# Login
print("\n" + "="*70)
print("LOGIN & AUTHENTICATION")
print("="*70)
test_login_invalid()
if not user:
print("\n❌ User registration failed, cannot continue tests")
return
tokens = test_login("auth_test_user", "SecurePass123")
if not tokens:
print("\n❌ Login failed, cannot continue tests")
return
access_token = tokens['access_token']
refresh_token = tokens['refresh_token']
# Protected Routes
print("\n" + "="*70)
print("PROTECTED ROUTES")
print("="*70)
test_protected_route_without_token()
test_protected_route_with_token(access_token)
test_get_current_user(access_token)
# Token Refresh
print("\n" + "="*70)
print("TOKEN REFRESH")
print("="*70)
new_tokens = test_refresh_token(refresh_token)
if new_tokens:
access_token = new_tokens['access_token']
# Password Management
print("\n" + "="*70)
print("PASSWORD MANAGEMENT")
print("="*70)
test_change_password(access_token)
# Re-login with new password
print("\n🔄 Re-logging in with new password...")
tokens = test_login("auth_test_user", "NewSecurePass123")
if tokens:
access_token = tokens['access_token']
# Password Reset Flow
print("\n" + "="*70)
print("PASSWORD RESET FLOW")
print("="*70)
reset_token = test_password_reset_request()
test_password_reset_confirm(reset_token)
# Re-login with reset password
if reset_token:
print("\n🔄 Re-logging in with reset password...")
tokens = test_login("auth_test_user", "ResetPass123")
if tokens:
access_token = tokens['access_token']
# Email Verification
print("\n" + "="*70)
print("EMAIL VERIFICATION")
print("="*70)
verification_token = test_email_verification_request(access_token)
test_email_verification_confirm(verification_token)
# Role-Based Access Control
print("\n" + "="*70)
print("ROLE-BASED ACCESS CONTROL")
print("="*70)
test_role_based_access(access_token)
# Token Expiration
print("\n" + "="*70)
print("TOKEN VALIDATION")
print("="*70)
test_expired_token()
# Logout
print("\n" + "="*70)
print("LOGOUT")
print("="*70)
test_logout(access_token)
print("\n" + "✅"*35)
print("ALL AUTHENTICATION TESTS COMPLETED!")
print("✅"*35)
print("\n💡 Key Features Tested:")
print(" ✅ User registration")
print(" ✅ Login with JWT tokens")
print(" ✅ Token refresh")
print(" ✅ Protected routes")
print(" ✅ Password change")
print(" ✅ Password reset flow")
print(" ✅ Email verification")
print(" ✅ Role-based access control")
print(" ✅ Token validation")
print(" ✅ Logout\n")
if __name__ == "__main__":
print("""
╔════════════════════════════════════════════════════════╗
║ Authentication Test Suite ║
║ ║
║ Tests: ║
║ - User registration ║
║ - Login & token generation ║
║ - Protected route access ║
║ - Token refresh ║
║ - Password management ║
║ - Email verification ║
║ - Role-based access control ║
║ ║
║ Prerequisites: ║
║ 1. PostgreSQL running ║
║ 2. FastAPI server running ║
║ 3. Database migrations applied ║
╚════════════════════════════════════════════════════════╝
""")
try:
response = requests.get(f"{BASE_URL}/health")
if response.status_code == 200:
run_all_tests()
else:
print("❌ Server returned unexpected status")
except requests.exceptions.ConnectionError:
print("❌ ERROR: Cannot connect to server!")
print(" Please start the server with: python main.py")
except Exception as e:
print(f"❌ ERROR: {e}")
Step 12: Create Token Blacklist (Optional Advanced Feature)
Create app/db/models/token_blacklist.py:
"""
Token blacklist model
For advanced security: blacklist revoked tokens
"""
from sqlalchemy import String, DateTime, Integer
from sqlalchemy.orm import Mapped, mapped_column
from datetime import datetime
from app.db.base import Base
class TokenBlacklist(Base):
"""
Token blacklist model
Stores revoked tokens to prevent their use after logout
"""
__tablename__ = "token_blacklist"
id: Mapped[int] = mapped_column(primary_key=True, index=True)
# Token identifier (jti claim from JWT)
jti: Mapped[str] = mapped_column(
String(255),
unique=True,
index=True,
nullable=False
)
# Token type (access or refresh)
token_type: Mapped[str] = mapped_column(
String(50),
nullable=False
)
# User ID who owned the token
user_id: Mapped[int] = mapped_column(
Integer,
nullable=False,
index=True
)
# When token was blacklisted
blacklisted_at: Mapped[datetime] = mapped_column(
DateTime,
default=datetime.utcnow,
nullable=False
)
# Token expiration (for cleanup)
expires_at: Mapped[datetime] = mapped_column(
DateTime,
nullable=False,
index=True
)
def __repr__(self) -> str:
return f"<TokenBlacklist(jti='{self.jti}', type='{self.token_type}')>"
Create migration for token blacklist:
alembic revision --autogenerate -m "Add token blacklist table"
alembic upgrade head
Step 13: Update Requirements
Update requirements.txt:
# FastAPI and core dependencies
fastapi==0.115.0
uvicorn[standard]==0.32.1
pydantic==2.10.3
pydantic-settings==2.7.0
pydantic[email]==2.10.3
# Email validation
email-validator==2.3.0
dnspython==2.8.0
# Phone number validation
phonenumbers==9.0.28
# Environment configuration
python-dotenv==1.2.2
# Structured logging
python-json-logger==4.1.0
# HTTP client
requests==2.33.1
# Database - SQLAlchemy
sqlalchemy[asyncio]==2.0.23
asyncpg==0.29.0
psycopg2-binary==2.9.9
# Database migrations
alembic==1.13.1
# Password hashing (modern bcrypt)
pwdlib==0.2.1
# JWT tokens
python-jose[cryptography]==3.3.0
# ASGI server
starlette==0.45.0
Step 14: Production Security Checklist
Create SECURITY.md:
# Security Best Practices
## JWT Security
### 1. Secret Key Management
**NEVER commit secret keys to version control!**
```bash
# Generate strong secret key
openssl rand -hex 32
# Store in environment variables
SECRET_KEY=your-generated-secret-key-here
# Use different keys for different environments
# Development: one key
# Staging: different key
# Production: different key
```
### 2. Token Expiration
**Configure appropriate expiration times:**
```bash
# Short-lived access tokens (15-30 minutes)
ACCESS_TOKEN_EXPIRE_MINUTES=30
# Longer refresh tokens (7 days)
REFRESH_TOKEN_EXPIRE_DAYS=7
# Password reset tokens (1 hour)
PASSWORD_RESET_TOKEN_EXPIRE_HOURS=1
```
### 3. HTTPS Only in Production
```python
# Force HTTPS in production
if settings.ENVIRONMENT == "production":
app.add_middleware(HTTPSRedirectMiddleware)
```
### 4. Token Storage (Frontend)
**Client-Side Best Practices:**
- ✅ Store tokens in httpOnly cookies (most secure)
- ⚠️ If using localStorage:
- Vulnerable to XSS attacks
- Never store sensitive data
- Clear on logout
- ❌ Never store in sessionStorage for persistent login
**Example (React):**
```javascript
// Good: httpOnly cookie (set by server)
// Server sets: Set-Cookie: access_token=...; HttpOnly; Secure; SameSite=Strict
// Acceptable: localStorage with precautions
localStorage.setItem('access_token', token);
// On logout
localStorage.removeItem('access_token');
localStorage.removeItem('refresh_token');
```
### 5. Rate Limiting
**Implement rate limiting on auth endpoints:**
```python
from slowapi import Limiter
from slowapi.util import get_remote_address
limiter = Limiter(key_func=get_remote_address)
@router.post("/login")
@limiter.limit("5/minute") # 5 login attempts per minute
async def login(...):
...
```
### 6. Password Security
**Enforce strong passwords:**
```python
# Minimum requirements
- Min 8 characters
- At least 1 uppercase
- At least 1 lowercase
- At least 1 number
- At least 1 special character
# Use pwdlib (modern bcrypt)
from pwdlib import PasswordHash
password_hash = PasswordHash.recommended()
```
### 7. Token Blacklisting
**For critical applications, implement token blacklisting:**
```python
# On logout
await blacklist_token(token_jti, user_id, expires_at)
# On token validation
if await is_token_blacklisted(token_jti):
raise HTTPException(401, "Token revoked")
```
### 8. CORS Configuration
**Restrict CORS to known origins:**
```python
# ❌ Never in production
allow_origins=["*"]
# ✅ Production
allow_origins=[
"https://yourdomain.com",
"https://app.yourdomain.com"
]
```
### 9. SQL Injection Prevention
**SQLAlchemy protects against SQL injection, but:**
```python
# ✅ Good (parameterized)
await session.execute(
select(User).where(User.username == username)
)
# ❌ Bad (string concatenation)
query = f"SELECT * FROM users WHERE username = '{username}'"
```
### 10. Input Validation
**Always validate input with Pydantic:**
```python
class UserCreate(BaseModel):
username: str = Field(..., min_length=3, max_length=50)
email: EmailStr
password: str = Field(..., min_length=8)
```
## Deployment Security
### Environment Variables
```bash
# .env (NEVER commit)
SECRET_KEY=production-secret-key
DATABASE_URL=postgresql://user:pass@prod-db:5432/db
# Use secrets management
# AWS: AWS Secrets Manager
# GCP: Secret Manager
# Azure: Key Vault
# Kubernetes: Secrets
```
### Database Security
```bash
# Use SSL for database connections
DATABASE_URL=postgresql://user:pass@host:5432/db?sslmode=require
# Restrict database access
# Only allow application servers
# Use firewall rules
# Enable audit logging
```
### Monitoring & Alerts
```python
# Log authentication events
logger.info("Failed login attempt", extra={
"username": username,
"ip": client_ip,
"user_agent": user_agent
})
# Alert on suspicious activity
# - Multiple failed logins
# - Password reset requests
# - Access from new locations
```
### Regular Security Updates
```bash
# Update dependencies regularly
pip list --outdated
pip install --upgrade package-name
# Check for vulnerabilities
pip install safety
safety check
# Monitor CVE databases
```
## Common Vulnerabilities
### Prevent Timing Attacks
```python
# ❌ Bad (timing attack vulnerable)
if user.password == password:
return True
# ✅ Good (constant time comparison)
from pwdlib import PasswordHash
password_hash.verify(password, user.hashed_password)
```
### Prevent Email Enumeration
```python
# ❌ Bad
if not user:
raise HTTPException(404, "User not found")
# ✅ Good (same response for existing/non-existing)
return MessageResponse(
message="If the email exists, a reset link has been sent"
)
```
### Prevent Brute Force
```python
# Implement rate limiting
# Add CAPTCHA after failed attempts
# Lock account after N failed attempts
# Implement exponential backoff
```
## Incident Response
### If Secret Key Compromised
1. Generate new secret key
2. Invalidate all existing tokens
3. Force users to re-authenticate
4. Notify users
5. Review access logs
### If Database Compromised
1. Rotate database credentials
2. Force password reset for all users
3. Review audit logs
4. Notify affected users
5. Report to authorities if required
## Compliance
### GDPR
- Allow users to download their data
- Allow users to delete their data
- Log data access
- Implement data retention policies
### HIPAA (if applicable)
- Encrypt data at rest and in transit
- Implement audit trails
- Access controls
- Regular security assessments
## Security Checklist
- [ ] Strong secret keys generated and stored securely
- [ ] HTTPS enabled in production
- [ ] Token expiration configured appropriately
- [ ] Rate limiting on auth endpoints
- [ ] Password strength requirements enforced
- [ ] CORS properly configured
- [ ] SQL injection protection verified
- [ ] Input validation comprehensive
- [ ] Logging and monitoring set up
- [ ] Regular security updates scheduled
- [ ] Incident response plan documented
- [ ] Backup and recovery tested
Step 15: Create Deployment Guide
Create AUTHENTICATION_DEPLOYMENT.md:
# Authentication Deployment Guide
## Pre-Deployment Checklist
### 1. Environment Configuration
```bash
# Generate production secret key
openssl rand -hex 32
# Update .env.production
SECRET_KEY=
ALGORITHM=HS256
ACCESS_TOKEN_EXPIRE_MINUTES=30
REFRESH_TOKEN_EXPIRE_DAYS=7
ENVIRONMENT=production
DEBUG=False
```
### 2. Database Setup
```bash
# Run migrations
alembic upgrade head
# Verify tables
psql -h host -U user -d db -c "\dt"
# Should include: users, conversations, messages, token_blacklist
```
### 3. Create Admin User
```python
# create_admin.py
import asyncio
from app.db.session import async_session_maker
from app.db.repositories.user_repository import UserRepository
from app.core.security import hash_password
from app.db.models.user import UserRole
async def create_admin():
async with async_session_maker() as session:
repo = UserRepository(session)
admin = await repo.create(
username="admin",
email="admin@yourdomain.com",
hashed_password=hash_password("CHANGE_THIS_PASSWORD"),
full_name="Administrator",
role=UserRole.ADMIN,
is_active=True,
is_verified=True
)
await session.commit()
print(f"Admin created: {admin.username}")
asyncio.run(create_admin())
```
### 4. SSL/TLS Configuration
```nginx
# nginx.conf
server {
listen 443 ssl http2;
server_name api.yourdomain.com;
ssl_certificate /etc/ssl/certs/cert.pem;
ssl_certificate_key /etc/ssl/private/key.pem;
location / {
proxy_pass http://localhost:8000;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
}
# Redirect HTTP to HTTPS
server {
listen 80;
server_name api.yourdomain.com;
return 301 https://$server_name$request_uri;
}
```
### 5. Email Configuration (Optional)
```bash
# SMTP settings
SMTP_HOST=smtp.gmail.com
SMTP_PORT=587
SMTP_USER=your-email@gmail.com
SMTP_PASSWORD=your-app-password
EMAILS_FROM_EMAIL=noreply@yourdomain.com
```
## Deployment Options
### Option 1: Docker
```dockerfile
# Dockerfile
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
```
```yaml
# docker-compose.yml
version: '3.8'
services:
api:
build: .
ports:
- "8000:8000"
environment:
- DATABASE_URL=${DATABASE_URL}
- SECRET_KEY=${SECRET_KEY}
depends_on:
- postgres
postgres:
image: postgres:15-alpine
environment:
POSTGRES_DB: aiverse_db
POSTGRES_USER: aiverse_user
POSTGRES_PASSWORD: ${DB_PASSWORD}
volumes:
- postgres_data:/var/lib/postgresql/data
volumes:
postgres_data:
```
### Option 2: Systemd Service
```ini
# /etc/systemd/system/aiverse-api.service
[Unit]
Description=AIVerse FastAPI Application
After=network.target
[Service]
Type=notify
User=www-data
Group=www-data
WorkingDirectory=/var/www/aiverse-api
Environment="PATH=/var/www/aiverse-api/venv/bin"
EnvironmentFile=/var/www/aiverse-api/.env
ExecStart=/var/www/aiverse-api/venv/bin/uvicorn app.main:app --host 0.0.0.0 --port 8000 --workers 4
Restart=always
[Install]
WantedBy=multi-user.target
```
```bash
# Enable and start
sudo systemctl enable aiverse-api
sudo systemctl start aiverse-api
sudo systemctl status aiverse-api
```
### Option 3: Cloud Platforms
**AWS (Elastic Beanstalk):**
```yaml
# .ebextensions/python.config
option_settings:
aws:elasticbeanstalk:application:environment:
SECRET_KEY: your-secret-key
DATABASE_URL: postgresql://...
```
**Google Cloud Run:**
```bash
# Build and deploy
gcloud builds submit --tag gcr.io/PROJECT_ID/aiverse-api
gcloud run deploy aiverse-api \
--image gcr.io/PROJECT_ID/aiverse-api \
--platform managed \
--region us-central1 \
--set-env-vars SECRET_KEY=xxx,DATABASE_URL=xxx
```
**Heroku:**
```bash
# Create app
heroku create aiverse-api
# Set environment variables
heroku config:set SECRET_KEY=xxx
heroku config:set DATABASE_URL=xxx
# Deploy
git push heroku main
# Run migrations
heroku run alembic upgrade head
```
## Post-Deployment
### 1. Health Checks
```bash
# API health
curl https://api.yourdomain.com/api/v1/health
# Database health
curl https://api.yourdomain.com/api/v1/health/database
# Authentication test
curl -X POST https://api.yourdomain.com/api/v1/auth/login \
-H "Content-Type: application/json" \
-d '{"username":"admin","password":"password"}'
```
### 2. Monitoring Setup
```python
# Add monitoring endpoints
from prometheus_client import make_asgi_app
metrics_app = make_asgi_app()
app.mount("/metrics", metrics_app)
```
### 3. Backup Strategy
```bash
# Database backup script
#!/bin/bash
pg_dump $DATABASE_URL | gzip > backup_$(date +%Y%m%d).sql.gz
aws s3 cp backup_$(date +%Y%m%d).sql.gz s3://backups/
```
### 4. Log Aggregation
```bash
# Send logs to CloudWatch/Datadog/etc.
# Configure in your logging setup
```
## Troubleshooting
### Token Issues
```bash
# Check token expiration
# Decode JWT at https://jwt.io
# Verify secret key is correct
echo $SECRET_KEY
# Check system time (JWT uses UTC)
date -u
```
### Database Connection
```bash
# Test connection
psql $DATABASE_URL
# Check pool status
# Add monitoring endpoint
```
### CORS Issues
```bash
# Verify ALLOWED_ORIGINS
echo $ALLOWED_ORIGINS
# Check preflight requests
curl -X OPTIONS https://api.yourdomain.com/api/v1/auth/login \
-H "Origin: https://app.yourdomain.com" \
-H "Access-Control-Request-Method: POST"
```
What’s the difference between access tokens and refresh tokens?
How do I implement token blacklisting for logout?
Should I store JWT tokens in localStorage or cookies?
How do I secure password reset functionality?
How do I implement OAuth2 social login (Google, GitHub)?
🎯 Complete Authentication Flow Diagram
┌─────────────┐
│ Client │
└──────┬──────┘
│
│ 1. POST /auth/register
│ {username, email, password}
▼
┌─────────────────────────────────────┐
│ FastAPI Backend │
│ ┌────────────────────────────┐ │
│ │ Create user │ │
│ │ Hash password (bcrypt) │ │
│ │ Store in PostgreSQL │ │
│ │ Return user info │ │
│ └────────────────────────────┘ │
└──────┬──────────────────────────────┘
│
│ 2. POST /auth/login
│ {username, password}
▼
┌─────────────────────────────────────┐
│ Validate credentials │
│ ┌────────────────────────────┐ │
│ │ Find user by username │ │
│ │ Verify password hash │ │
│ │ Generate access token │ │
│ │ Generate refresh token │ │
│ │ Return tokens │ │
│ └────────────────────────────┘ │
└──────┬──────────────────────────────┘
│
│ {access_token, refresh_token}
│
│ 3. GET /users/me
│ Authorization: Bearer <access_token>
▼
┌─────────────────────────────────────┐
│ Validate token │
│ ┌────────────────────────────┐ │
│ │ Decode JWT │ │
│ │ Verify signature │ │
│ │ Check expiration │ │
│ │ Get user from database │ │
│ │ Return user info │ │
│ └────────────────────────────┘ │
└─────────────────────────────────────┘
🎉 Summary
What You’ve Accomplished:
✅ Complete JWT Authentication System
- User registration and login
- Access and refresh tokens
- Token rotation for security
- OAuth2 password flow
✅ Password Management
- Secure password hashing (bcrypt)
- Password change functionality
- Password reset via email tokens
- Strong password validation
✅ Email Verification
- Email verification tokens
- Verification workflow
- Resend verification emails
✅ Authorization
- Role-based access control (RBAC)
- Protected routes
- Custom permission dependencies
- User/admin role separation
✅ Security Features
- Token expiration
- Token type validation
- Rate limiting ready
- Production security guidelines
✅ Production Ready
- Comprehensive error handling
- Structured logging
- Environment-based configuration
- Deployment documentation
📊 Project Status
Episodes Completed:
├── ✅ Ep. 1-5: Core FastAPI Setup
├── ✅ Ep. 6: AI Integration (Ollama)
├── ✅ Ep. 7: React Frontend
├── ✅ Ep. 8: Database Integration
└── ✅ Ep. 9: JWT Authentication
Current Version: 0.5.0
Status: Production-Ready with Authentication
🔗 Quick Reference Commands
# Register user
curl -X POST http://localhost:8000/api/v1/auth/register \
-H "Content-Type: application/json" \
-d '{"username":"user","email":"user@example.com","password":"Pass123"}'
# Login
curl -X POST http://localhost:8000/api/v1/auth/login \
-H "Content-Type: application/json" \
-d '{"username":"user","password":"Pass123"}'
# Use protected endpoint
curl -X GET http://localhost:8000/api/v1/users/me \
-H "Authorization: Bearer YOUR_ACCESS_TOKEN"
# Refresh token
curl -X POST http://localhost:8000/api/v1/auth/refresh \
-H "Content-Type: application/json" \
-d '{"refresh_token":"YOUR_REFRESH_TOKEN"}'
# Change password
curl -X POST http://localhost:8000/api/v1/auth/password/change \
-H "Authorization: Bearer YOUR_ACCESS_TOKEN" \
-H "Content-Type: application/json" \
-d '{"current_password":"Pass123","new_password":"NewPass123"}'
🚀 What’s Next: Ep.10
Ready to deploy your complete application to production!
Ep.10 will cover:
- Docker containerization
- Docker Compose for local development
- Kubernetes deployment
- CI/CD with GitHub Actions
- AWS/GCP/Azure deployment
- Monitoring and logging
- Performance optimization
- Production best practices
📝 Files Created/Updated Summary
New Files:
app/core/security.py– JWT and password utilitiesapp/core/auth_dependencies.py– Authentication dependenciesapp/models/auth.py– Authentication Pydantic modelsapp/api/v1/endpoints/auth.py– Authentication endpointstest_authentication.py– Authentication test suiteSECURITY.md– Security best practicesAUTHENTICATION_DEPLOYMENT.md– Deployment guide
Updated Files:
app/core/config.py– Added JWT settingsapp/services/user_service.py– Updated to use new security moduleapp/api/v1/endpoints/users.py– Added authentication protectionapp/api/v1/api.py– Added auth routerapp/main.py– Updated description.env– Added JWT configurationrequirements.txt– Added python-jose, pwdlib
Congratulations! Your FastAPI application now has production-grade authentication! 🎉
Ready to proceed with Ep.10: Production Deployment?
Leave a Reply