Views: 4
Advanced Pydantic validation in FastAPI is implemented using @field_validator for attribute-specific rules and @model_validator for cross-field logic. To handle errors professionally, developers must register custom exception handlers for RequestValidationError to transform raw Pydantic tracebacks into standardized, client-friendly JSON responses. By combining these validation decorators with centralized error handling, you ensure data integrity for complex nested models while maintaining a consistent API contract across your entire application.
🎓 What You’ll Learn
By the end of this tutorial, you’ll understand:
- Advanced Pydantic validation techniques
- Custom validators and field validators
- Nested models and complex data structures
- Custom exception handlers in FastAPI
- Standardized error responses
- Request/response lifecycle and middleware
- Validation best practices
📖 Why Advanced Validation Matters
The Problem: Basic Validation Isn’t Enough
# Basic validation
username: str = Field(..., min_length=3)
What if you need to:
- ✅ Ensure username contains only letters and numbers?
- ✅ Validate password strength (uppercase, lowercase, numbers)?
- ✅ Check if an email domain is from an allowed list?
- ✅ Validate relationships between fields?
- ✅ Transform data during validation?
Benefits of Advanced Validation
| Benefit | Impact |
|---|---|
| Data Quality | Catch errors before they reach your database |
| Security | Prevent injection attacks and malicious input |
| User Experience | Provide clear, actionable error messages |
| Business Rules | Enforce complex domain logic |
| Maintainability | Validation logic is declarative and testable |
🛠️ Step-by-Step Implementation
Step 1: Understanding Pydantic Validators
Pydantic offers several types of validators:
- Field validators: Validate single fields
- Model validators: Validate entire models or multiple fields
- Root validators: Validate the complete data before/after parsing
- Custom types: Create reusable validated types
Let’s explore each!
Step 2: Install Additional Dependencies
# Activate your virtual environment first
source venv/bin/activate # or venv\Scripts\activate on Windows
# Install additional validation tools
pip install email-validator phonenumbers pydantic[email]
# Update requirements
pip freeze > requirements.txt
What we installed:
email-validator: Enhanced email validationphonenumbers: Phone number validation and formattingpydantic[email]: Email validation support for Pydantic
Step 3: Create Advanced User Models
Create app/models/user_advanced.py:
"""
Advanced user models with comprehensive validation
This module demonstrates various Pydantic validation techniques
"""
from pydantic import (
BaseModel,
Field,
EmailStr,
field_validator,
model_validator,
ConfigDict
)
from typing import Optional, List, Any
from datetime import datetime, date
from enum import Enum
import re
class UserRole(str, Enum):
"""User role enumeration"""
ADMIN = "admin"
USER = "user"
GUEST = "guest"
MODERATOR = "moderator"
class PasswordStrength(str, Enum):
"""Password strength levels"""
WEAK = "weak"
MEDIUM = "medium"
STRONG = "strong"
class UserCreateAdvanced(BaseModel):
"""
Advanced user creation model with custom validation
"""
username: str = Field(
...,
min_length=3,
max_length=50,
description="Username (letters, numbers, underscore, hyphen only)"
)
email: EmailStr = Field(
...,
description="Valid email address"
)
password: str = Field(
...,
min_length=8,
max_length=128,
description="Password (min 8 chars, must include uppercase, lowercase, and number)"
)
confirm_password: str = Field(
...,
description="Password confirmation (must match password)"
)
full_name: Optional[str] = Field(
None,
min_length=2,
max_length=100,
description="Full name"
)
age: Optional[int] = Field(
None,
ge=13,
le=120,
description="Age (must be 13 or older)"
)
date_of_birth: Optional[date] = Field(
None,
description="Date of birth"
)
phone_number: Optional[str] = Field(
None,
description="Phone number (international format preferred)"
)
website: Optional[str] = Field(
None,
description="Personal website URL"
)
bio: Optional[str] = Field(
None,
max_length=500,
description="Short bio"
)
role: UserRole = Field(
default=UserRole.USER,
description="User role"
)
allowed_email_domains: List[str] = Field(
default=["gmail.com", "yahoo.com", "outlook.com", "example.com"],
description="Allowed email domains for registration"
)
tags: List[str] = Field(
default=[],
max_length=10,
description="User tags (max 10)"
)
# ========================================
# FIELD VALIDATORS
# ========================================
@field_validator('username')
@classmethod
def validate_username(cls, v: str) -> str:
"""
Validate username format
Rules:
- Only alphanumeric, underscore, and hyphen
- Cannot start or end with underscore/hyphen
- No consecutive underscores or hyphens
"""
# Check allowed characters
if not re.match(r'^[a-zA-Z0-9_-]+$', v):
raise ValueError(
'Username can only contain letters, numbers, underscores, and hyphens'
)
# Cannot start/end with special chars
if v[0] in '_-' or v[-1] in '_-':
raise ValueError(
'Username cannot start or end with underscore or hyphen'
)
# No consecutive special chars
if '__' in v or '--' in v or '_-' in v or '-_' in v:
raise ValueError(
'Username cannot contain consecutive special characters'
)
# Convert to lowercase for consistency
return v.lower()
@field_validator('password')
@classmethod
def validate_password_strength(cls, v: str) -> str:
"""
Validate password strength
Requirements:
- At least 8 characters
- At least one uppercase letter
- At least one lowercase letter
- At least one digit
- Optionally one special character for strong passwords
"""
if len(v) < 8:
raise ValueError('Password must be at least 8 characters long')
if not re.search(r'[A-Z]', v):
raise ValueError('Password must contain at least one uppercase letter')
if not re.search(r'[a-z]', v):
raise ValueError('Password must contain at least one lowercase letter')
if not re.search(r'\d', v):
raise ValueError('Password must contain at least one number')
# Check for common weak passwords
weak_passwords = ['password', '12345678', 'qwerty', 'abc123']
if v.lower() in weak_passwords:
raise ValueError('Password is too common. Please choose a stronger password.')
return v
@field_validator('email')
@classmethod
def validate_email_domain(cls, v: EmailStr, info) -> EmailStr:
"""
Validate email domain against allowed list
Note: info.data contains other field values that have been validated so far
"""
# Extract domain from email
domain = v.split('@')[1].lower()
# Get allowed domains from model data (if available)
# During validation, other fields might not be set yet
# So we use a default list
default_allowed = ['gmail.com', 'yahoo.com', 'outlook.com', 'example.com']
if domain not in default_allowed:
raise ValueError(
f'Email domain "{domain}" is not allowed. '
f'Allowed domains: {", ".join(default_allowed)}'
)
return v.lower()
@field_validator('full_name')
@classmethod
def validate_full_name(cls, v: Optional[str]) -> Optional[str]:
"""
Validate and clean full name
"""
if v is None:
return v
# Remove extra whitespace
v = ' '.join(v.split())
# Check for numbers (names shouldn't have numbers)
if re.search(r'\d', v):
raise ValueError('Full name cannot contain numbers')
# Capitalize each word
return v.title()
@field_validator('website')
@classmethod
def validate_website_url(cls, v: Optional[str]) -> Optional[str]:
"""
Validate website URL format
"""
if v is None:
return v
# Add https:// if no protocol specified
if not v.startswith(('http://', 'https://')):
v = f'https://{v}'
# Basic URL validation
url_pattern = re.compile(
r'^https?://' # http:// or https://
r'(?:(?:[A-Z0-9](?:[A-Z0-9-]{0,61}[A-Z0-9])?\.)+[A-Z]{2,6}\.?|' # domain
r'localhost|' # localhost
r'\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})' # or IP
r'(?::\d+)?' # optional port
r'(?:/?|[/?]\S+)$', re.IGNORECASE
)
if not url_pattern.match(v):
raise ValueError('Invalid website URL format')
return v
@field_validator('tags')
@classmethod
def validate_tags(cls, v: List[str]) -> List[str]:
"""
Validate and clean tags
"""
if not v:
return v
# Remove duplicates and empty strings
tags = [tag.strip().lower() for tag in v if tag.strip()]
tags = list(set(tags)) # Remove duplicates
# Validate each tag
for tag in tags:
if len(tag) < 2:
raise ValueError('Each tag must be at least 2 characters long')
if len(tag) > 30:
raise ValueError('Each tag must be at most 30 characters long')
if not re.match(r'^[a-z0-9-_]+$', tag):
raise ValueError('Tags can only contain lowercase letters, numbers, hyphens, and underscores')
return tags
@field_validator('date_of_birth')
@classmethod
def validate_date_of_birth(cls, v: Optional[date]) -> Optional[date]:
"""
Validate date of birth
"""
if v is None:
return v
# Check if date is in the future
if v > date.today():
raise ValueError('Date of birth cannot be in the future')
# Check minimum age (13 years)
today = date.today()
age = today.year - v.year - ((today.month, today.day) < (v.month, v.day))
if age < 13:
raise ValueError('You must be at least 13 years old to register')
if age > 120:
raise ValueError('Invalid date of birth')
return v
# ========================================
# MODEL VALIDATORS (validate multiple fields)
# ========================================
@model_validator(mode='after')
def validate_passwords_match(self) -> 'UserCreateAdvanced':
"""
Validate that password and confirm_password match
This runs AFTER all field validators
"""
if self.password != self.confirm_password:
raise ValueError('Passwords do not match')
return self
@model_validator(mode='after')
def validate_age_and_dob_consistency(self) -> 'UserCreateAdvanced':
"""
If both age and date_of_birth are provided, ensure they're consistent
"""
if self.age is not None and self.date_of_birth is not None:
# Calculate age from date of birth
today = date.today()
calculated_age = today.year - self.date_of_birth.year
calculated_age -= ((today.month, today.day) < (self.date_of_birth.month, self.date_of_birth.day))
# Allow 1 year difference (birthday might not have occurred yet)
if abs(calculated_age - self.age) > 1:
raise ValueError(
f'Age ({self.age}) does not match date of birth '
f'(calculated age: {calculated_age})'
)
return self
# ========================================
# CONFIGURATION
# ========================================
model_config = ConfigDict(
str_strip_whitespace=True, # Automatically strip whitespace from strings
json_schema_extra={
"example": {
"username": "john_doe",
"email": "john@example.com",
"password": "SecurePass123",
"confirm_password": "SecurePass123",
"full_name": "John Doe",
"age": 25,
"date_of_birth": "1999-01-15",
"phone_number": "+1-555-123-4567",
"website": "https://johndoe.com",
"bio": "Software developer passionate about AI",
"role": "user",
"tags": ["developer", "python", "ai"]
}
}
)
class UserResponseAdvanced(BaseModel):
"""
Advanced user response model
"""
id: int
username: str
email: EmailStr
full_name: Optional[str]
age: Optional[int]
date_of_birth: Optional[date]
phone_number: Optional[str]
website: Optional[str]
bio: Optional[str]
role: UserRole
is_active: bool
created_at: datetime
tags: List[str]
# Computed fields (derived from other fields)
@property
def display_name(self) -> str:
"""Return full name if available, otherwise username"""
return self.full_name if self.full_name else self.username
@property
def is_admin(self) -> bool:
"""Check if user is admin"""
return self.role == UserRole.ADMIN
model_config = ConfigDict(
from_attributes=True,
json_schema_extra={
"example": {
"id": 1,
"username": "john_doe",
"email": "john@example.com",
"full_name": "John Doe",
"age": 25,
"date_of_birth": "1999-01-15",
"role": "user",
"is_active": True,
"created_at": "2024-01-15T10:30:00",
"tags": ["developer", "python"]
}
}
)
🔍 Key Validation Concepts:
@field_validator: Validates individual fields- Runs automatically when the field is set
- Can transform the value (return modified value)
- Access other fields via
info.data
@model_validator: Validates entire modelmode='after': Runs after all fields are validatedmode='before': Runs before field validation (gets raw data)- Perfect for cross-field validation
@property: Computed fields- Not stored in the model
- Calculated on-the-fly when accessed
- Useful for derived data
Step 4: Create Custom Exceptions
Create app/core/exceptions.py:
"""
Custom exceptions for the application
These provide more specific error handling than generic HTTPException
"""
from fastapi import HTTPException, status
from typing import Any, Optional, Dict
class AppException(HTTPException):
"""
Base application exception
All custom exceptions should inherit from this
"""
def __init__(
self,
status_code: int,
detail: str,
error_code: Optional[str] = None,
headers: Optional[Dict[str, str]] = None
):
super().__init__(status_code=status_code, detail=detail, headers=headers)
self.error_code = error_code
class UserNotFoundException(AppException):
"""User not found exception"""
def __init__(self, user_id: Any):
super().__init__(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"User with ID '{user_id}' not found",
error_code="USER_NOT_FOUND"
)
class UserAlreadyExistsException(AppException):
"""User already exists exception"""
def __init__(self, field: str, value: str):
super().__init__(
status_code=status.HTTP_400_BAD_REQUEST,
detail=f"User with {field} '{value}' already exists",
error_code="USER_ALREADY_EXISTS"
)
class InvalidCredentialsException(AppException):
"""Invalid credentials exception"""
def __init__(self):
super().__init__(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid username or password",
error_code="INVALID_CREDENTIALS"
)
class PermissionDeniedException(AppException):
"""Permission denied exception"""
def __init__(self, action: str):
super().__init__(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"You don't have permission to {action}",
error_code="PERMISSION_DENIED"
)
class ValidationException(AppException):
"""Custom validation exception"""
def __init__(self, detail: str):
super().__init__(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=detail,
error_code="VALIDATION_ERROR"
)
class DatabaseException(AppException):
"""Database operation exception"""
def __init__(self, detail: str = "Database operation failed"):
super().__init__(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=detail,
error_code="DATABASE_ERROR"
)
Step 5: Create Error Response Models
Create app/models/error.py:
"""
Error response models
Standardized error responses for the API
"""
from pydantic import BaseModel, Field
from typing import Optional, List, Dict, Any
from datetime import datetime
class ValidationError(BaseModel):
"""Single validation error"""
field: str = Field(..., description="Field that failed validation")
message: str = Field(..., description="Error message")
error_type: Optional[str] = Field(None, description="Type of validation error")
class ErrorResponse(BaseModel):
"""
Standard error response format
Used for all API errors
"""
success: bool = Field(default=False, description="Always false for errors")
error_code: Optional[str] = Field(None, description="Application-specific error code")
message: str = Field(..., description="Human-readable error message")
details: Optional[str] = Field(None, description="Additional error details")
timestamp: datetime = Field(default_factory=datetime.now, description="Error timestamp")
path: Optional[str] = Field(None, description="Request path that caused the error")
model_config = {
"json_schema_extra": {
"example": {
"success": False,
"error_code": "USER_NOT_FOUND",
"message": "User with ID '123' not found",
"timestamp": "2024-01-15T10:30:00",
"path": "/api/v1/users/123"
}
}
}
class ValidationErrorResponse(BaseModel):
"""
Validation error response
Used when Pydantic validation fails
"""
success: bool = Field(default=False)
error_code: str = Field(default="VALIDATION_ERROR")
message: str = Field(default="Validation error")
errors: List[ValidationError] = Field(..., description="List of validation errors")
timestamp: datetime = Field(default_factory=datetime.now)
model_config = {
"json_schema_extra": {
"example": {
"success": False,
"error_code": "VALIDATION_ERROR",
"message": "Validation error",
"errors": [
{
"field": "email",
"message": "value is not a valid email address",
"error_type": "value_error.email"
},
{
"field": "password",
"message": "Password must contain at least one uppercase letter",
"error_type": "value_error"
}
],
"timestamp": "2024-01-15T10:30:00"
}
}
}
Step 6: Create Exception Handlers
Create app/core/error_handlers.py:
"""
Global exception handlers
Centralized error handling for the entire application
"""
from fastapi import Request, status
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError
from pydantic import ValidationError
from datetime import datetime
from typing import Union
from app.core.exceptions import AppException
from app.models.error import ErrorResponse, ValidationErrorResponse, ValidationError as ValidationErrorModel
async def app_exception_handler(request: Request, exc: AppException) -> JSONResponse:
"""
Handler for custom application exceptions
Returns standardized error response
"""
error_response = ErrorResponse(
error_code=exc.error_code,
message=exc.detail,
path=str(request.url.path)
)
return JSONResponse(
status_code=exc.status_code,
content=error_response.model_dump()
)
async def validation_exception_handler(
request: Request,
exc: Union[RequestValidationError, ValidationError]
) -> JSONResponse:
"""
Handler for Pydantic validation errors
Transforms Pydantic errors into our standard format
"""
errors = []
for error in exc.errors():
# Extract field name (might be nested)
field = " -> ".join(str(loc) for loc in error["loc"] if loc != "body")
errors.append(
ValidationErrorModel(
field=field,
message=error["msg"],
error_type=error["type"]
)
)
error_response = ValidationErrorResponse(
errors=errors,
message=f"Validation failed for {len(errors)} field(s)"
)
return JSONResponse(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
content=error_response.model_dump()
)
async def generic_exception_handler(request: Request, exc: Exception) -> JSONResponse:
"""
Handler for any unhandled exceptions
Last resort error handler
"""
error_response = ErrorResponse(
error_code="INTERNAL_SERVER_ERROR",
message="An unexpected error occurred",
details=str(exc) if request.app.debug else None, # Only show in debug mode
path=str(request.url.path)
)
# Log the error (we'll implement proper logging later)
print(f"Unhandled exception: {exc}")
return JSONResponse(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
content=error_response.model_dump()
)
🔍 Understanding Exception Handlers:
app_exception_handler: Handles our custom exceptions- Converts to standardized JSON response
- Uses the error code and message from the exception
validation_exception_handler: Handles Pydantic validation errors- Transforms Pydantic’s error format to our format
- Makes errors more user-friendly
generic_exception_handler: Catches everything else- Prevents raw Python errors from being exposed
- Logs the error for debugging
Step 7: Update Main Application with Error Handlers
Update app/main.py:
"""
FastAPI AI Backend - Main Application
Updated with error handling
"""
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from fastapi.exceptions import RequestValidationError
from pydantic import ValidationError
from app.core.config import settings
from app.api.v1.api import api_router
from app.core.exceptions import AppException
from app.core.error_handlers import (
app_exception_handler,
validation_exception_handler,
generic_exception_handler
)
def create_application() -> FastAPI:
"""
Application factory pattern
Creates and configures the FastAPI application
"""
app = FastAPI(
title=settings.APP_NAME,
version=settings.APP_VERSION,
description="""
A production-ready FastAPI backend for AI applications.
## Features
* Advanced data validation with Pydantic
* Comprehensive error handling
* Standardized error responses
* User management with CRUD operations
* RESTful API design
* Automatic API documentation
* Modular architecture
* Environment-based configuration
## Error Handling
All errors follow a standardized format:
* Custom exceptions with error codes
* Detailed validation errors
* Consistent response structure
## Coming Soon
* AI model integration (Ollama)
* Authentication & Authorization
* Database integration
* WebSocket support for streaming
""",
debug=settings.DEBUG,
docs_url="/docs",
redoc_url="/redoc",
openapi_url="/openapi.json"
)
# Configure CORS
app.add_middleware(
CORSMiddleware,
allow_origins=settings.allowed_origins_list,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
# Register exception handlers
app.add_exception_handler(AppException, app_exception_handler)
app.add_exception_handler(RequestValidationError, validation_exception_handler)
app.add_exception_handler(ValidationError, validation_exception_handler)
app.add_exception_handler(Exception, generic_exception_handler)
# Include API routers
app.include_router(
api_router,
prefix=settings.API_V1_PREFIX
)
return app
# Create the application instance
app = create_application()
@app.on_event("startup")
async def startup_event():
"""Code to run when the application starts"""
print(f"🚀 Starting {settings.APP_NAME} v{settings.APP_VERSION}")
print(f"📝 Environment: {settings.ENVIRONMENT}")
print(f"🔧 Debug mode: {settings.DEBUG}")
print(f"📚 API Docs: http://{settings.HOST}:{settings.PORT}/docs")
print(f"✅ Error handling configured")
@app.on_event("shutdown")
async def shutdown_event():
"""Code to run when the application shuts down"""
print(f"👋 Shutting down {settings.APP_NAME}")
Step 8: Update User Service with Custom Exceptions
Update app/services/user_service.py:
"""
User service - Business logic for user operations
Updated with custom exceptions
"""
from typing import List, Optional
from datetime import datetime
from app.models.user import User, UserCreate, UserUpdate, UserRole
from app.core.exceptions import (
UserNotFoundException,
UserAlreadyExistsException
)
class UserService:
"""
User service class
Handles all user-related business logic
"""
def __init__(self):
"""Initialize the service with in-memory storage"""
self._users: List[User] = []
self._next_id: int = 1
def create_user(self, user_data: UserCreate) -> User:
"""
Create a new user
Args:
user_data: User creation data
Returns:
Created user
Raises:
UserAlreadyExistsException: If username or email already exists
"""
# Check for duplicate username
if self.get_user_by_username(user_data.username):
raise UserAlreadyExistsException("username", user_data.username)
# Check for duplicate email
if self.get_user_by_email(user_data.email):
raise UserAlreadyExistsException("email", user_data.email)
# Create new user
new_user = User(
id=self._next_id,
username=user_data.username,
email=user_data.email,
full_name=user_data.full_name,
role=user_data.role,
is_active=True,
created_at=datetime.now(),
tags=[]
)
self._users.append(new_user)
self._next_id += 1
return new_user
def get_all_users(
self,
skip: int = 0,
limit: int = 10,
role: Optional[UserRole] = None
) -> List[User]:
"""
Get all users with optional filtering
"""
users = self._users
if role:
users = [u for u in users if u.role == role]
return users[skip : skip + limit]
def get_user_by_id(self, user_id: int) -> User:
"""
Get user by ID
Args:
user_id: User identifier
Returns:
User object
Raises:
UserNotFoundException: If user not found
"""
for user in self._users:
if user.id == user_id:
return user
raise UserNotFoundException(user_id)
def get_user_by_username(self, username: str) -> Optional[User]:
"""Get user by username"""
for user in self._users:
if user.username == username:
return user
return None
def get_user_by_email(self, email: str) -> Optional[User]:
"""Get user by email"""
for user in self._users:
if user.email == email:
return user
return None
def update_user(self, user_id: int, user_update: UserUpdate) -> User:
"""
Partially update a user
Raises:
UserNotFoundException: If user not found
UserAlreadyExistsException: If username/email conflict
"""
user = self.get_user_by_id(user_id)
update_data = user_update.model_dump(exclude_unset=True)
# Check for username conflict
if "username" in update_data:
existing = self.get_user_by_username(update_data["username"])
if existing and existing.id != user_id:
raise UserAlreadyExistsException("username", update_data["username"])
# Check for email conflict
if "email" in update_data:
existing = self.get_user_by_email(update_data["email"])
if existing and existing.id != user_id:
raise UserAlreadyExistsException("email", update_data["email"])
# Apply updates
for field, value in update_data.items():
setattr(user, field, value)
return user
def delete_user(self, user_id: int) -> None:
"""
Delete a user
Raises:
UserNotFoundException: If user not found
"""
for idx, user in enumerate(self._users):
if user.id == user_id:
self._users.pop(idx)
return
raise UserNotFoundException(user_id)
def get_user_count(self) -> int:
"""Get total number of users"""
return len(self._users)
# Global instance
user_service = UserService()
Step 9: Create Advanced User Endpoints
Create app/api/v1/endpoints/users_advanced.py:
"""
Advanced user endpoints with comprehensive validation
"""
from fastapi import APIRouter, Depends, status, Query
from typing import List, Optional
from app.models.user_advanced import (
UserCreateAdvanced,
UserResponseAdvanced
)
from app.models.user import UserRole, UserUpdate, User
from app.models.common import MessageResponse
from app.services.user_service import UserService, user_service
from datetime import datetime
router = APIRouter(prefix="/users-advanced", tags=["Users Advanced"])
def get_user_service() -> UserService:
"""Dependency to get user service instance"""
return user_service
@router.post(
"",
response_model=MessageResponse,
status_code=status.HTTP_201_CREATED,
summary="Create user with advanced validation",
description="""
Create a new user with comprehensive validation:
**Username Rules:**
- 3-50 characters
- Only letters, numbers, underscores, hyphens
- Cannot start/end with special characters
- No consecutive special characters
- Automatically converted to lowercase
**Password Rules:**
- Minimum 8 characters
- Must contain uppercase letter
- Must contain lowercase letter
- Must contain number
- Cannot be common password
**Email Rules:**
- Valid email format
- Must be from allowed domain
**Age Rules:**
- Must be 13 or older
- If date_of_birth provided, must be consistent
**Other Validations:**
- Full name: no numbers, proper capitalization
- Website: valid URL format
- Tags: 2-30 chars, lowercase, no duplicates
"""
)
async def create_user_advanced(
user_data: UserCreateAdvanced,
service: UserService = Depends(get_user_service)
):
"""
Create a user with advanced validation
This endpoint demonstrates comprehensive Pydantic validation
"""
# Convert UserCreateAdvanced to regular UserCreate
from app.models.user import UserCreate
user_create = UserCreate(
username=user_data.username,
email=user_data.email,
password=user_data.password,
full_name=user_data.full_name,
role=user_data.role
)
# Create the user
created_user = service.create_user(user_create)
return MessageResponse(
message=f"User '{created_user.username}' created successfully",
success=True
)
@router.post(
"/validate",
summary="Validate user data without creating",
description="Test user data against all validation rules without actually creating the user"
)
async def validate_user_data(user_data: UserCreateAdvanced):
"""
Validate user data without creating
Useful for client-side validation feedback
"""
return MessageResponse(
message="User data is valid",
success=True
)
@router.get(
"/validation-rules",
summary="Get validation rules",
description="Returns all validation rules for user creation"
)
async def get_validation_rules():
"""
Get validation rules documentation
Helps frontend developers understand what validations are in place
"""
return {
"username": {
"min_length": 3,
"max_length": 50,
"pattern": "^[a-zA-Z0-9_-]+$",
"rules": [
"Only letters, numbers, underscores, and hyphens",
"Cannot start/end with special characters",
"No consecutive special characters",
"Automatically converted to lowercase"
]
},
"password": {
"min_length": 8,
"max_length": 128,
"rules": [
"At least one uppercase letter",
"At least one lowercase letter",
"At least one number",
"Cannot be common password (password, 12345678, etc.)"
]
},
"email": {
"format": "Valid email address",
"allowed_domains": ["gmail.com", "yahoo.com", "outlook.com", "example.com"]
},
"age": {
"minimum": 13,
"maximum": 120
},
"full_name": {
"min_length": 2,
"max_length": 100,
"rules": [
"Cannot contain numbers",
"Automatically capitalized"
]
},
"website": {
"format": "Valid URL",
"rules": [
"Automatically adds https:// if missing"
]
},
"tags": {
"min_length": 2,
"max_length": 30,
"max_count": 10,
"pattern": "^[a-z0-9-_]+$",
"rules": [
"Lowercase only",
"Letters, numbers, hyphens, underscores",
"Duplicates automatically removed"
]
}
}
Step 10: Update API Router
Update app/api/v1/api.py:
"""
API v1 router aggregator
Combines all v1 endpoint routers
"""
from fastapi import APIRouter
from app.api.v1.endpoints import users, health, users_advanced
# Create main v1 router
api_router = APIRouter()
# Include all endpoint routers
api_router.include_router(health.router)
api_router.include_router(users.router)
api_router.include_router(users_advanced.router) # New!
# Future routers:
# api_router.include_router(ai.router)
Step 11: Create Test Script for Advanced Validation
Create test_advanced_validation.py in project root:
"""
Test script for advanced validation features
Tests all validation rules and error handling
"""
import requests
import json
from typing import Dict, Any
BASE_URL = "http://127.0.0.1:8000/api/v1"
def print_test_result(title: str, response: requests.Response):
"""Helper to print test results"""
print(f"\n{'='*70}")
print(f"{title}")
print(f"{'='*70}")
print(f"Status Code: {response.status_code}")
try:
print(f"Response:\n{json.dumps(response.json(), indent=2, default=str)}")
except:
print(f"Response: {response.text}")
def test_valid_user():
"""Test creating a valid user"""
user_data = {
"username": "john_doe",
"email": "john@example.com",
"password": "SecurePass123",
"confirm_password": "SecurePass123",
"full_name": "John Doe",
"age": 25,
"date_of_birth": "1999-01-15",
"website": "johndoe.com", # Will be converted to https://johndoe.com
"bio": "Software developer",
"tags": ["developer", "python", "ai"]
}
response = requests.post(
f"{BASE_URL}/users-advanced",
json=user_data
)
print_test_result("✅ VALID USER", response)
def test_invalid_username():
"""Test invalid username formats"""
test_cases = [
("_invalid", "Username starting with underscore"),
("invalid-", "Username ending with hyphen"),
("invalid__name", "Consecutive underscores"),
("inv@lid", "Special character (@)"),
("ab", "Too short (less than 3 chars)")
]
for username, description in test_cases:
user_data = {
"username": username,
"email": "test@example.com",
"password": "SecurePass123",
"confirm_password": "SecurePass123"
}
response = requests.post(
f"{BASE_URL}/users-advanced",
json=user_data
)
print_test_result(f"❌ INVALID USERNAME: {description}", response)
def test_invalid_password():
"""Test invalid password formats"""
test_cases = [
("short", "Too short"),
("nouppercase123", "No uppercase letter"),
("NOLOWERCASE123", "No lowercase letter"),
("NoNumbers", "No numbers"),
("password123", "Common password"),
("12345678", "Common password")
]
for password, description in test_cases:
user_data = {
"username": "testuser",
"email": "test@example.com",
"password": password,
"confirm_password": password
}
response = requests.post(
f"{BASE_URL}/users-advanced",
json=user_data
)
print_test_result(f"❌ INVALID PASSWORD: {description}", response)
def test_password_mismatch():
"""Test password confirmation mismatch"""
user_data = {
"username": "testuser",
"email": "test@example.com",
"password": "SecurePass123",
"confirm_password": "DifferentPass123"
}
response = requests.post(
f"{BASE_URL}/users-advanced",
json=user_data
)
print_test_result("❌ PASSWORDS DON'T MATCH", response)
def test_invalid_email():
"""Test invalid email formats"""
test_cases = [
("notanemail", "Not an email format"),
("test@invalid-domain.com", "Domain not in allowed list")
]
for email, description in test_cases:
user_data = {
"username": "testuser",
"email": email,
"password": "SecurePass123",
"confirm_password": "SecurePass123"
}
response = requests.post(
f"{BASE_URL}/users-advanced",
json=user_data
)
print_test_result(f"❌ INVALID EMAIL: {description}", response)
def test_invalid_age():
"""Test invalid age values"""
test_cases = [
(12, "Too young (under 13)"),
(150, "Too old (over 120)")
]
for age, description in test_cases:
user_data = {
"username": "testuser",
"email": "test@example.com",
"password": "SecurePass123",
"confirm_password": "SecurePass123",
"age": age
}
response = requests.post(
f"{BASE_URL}/users-advanced",
json=user_data
)
print_test_result(f"❌ INVALID AGE: {description}", response)
def test_full_name_validation():
"""Test full name validation"""
user_data = {
"username": "testuser",
"email": "test@example.com",
"password": "SecurePass123",
"confirm_password": "SecurePass123",
"full_name": "John123 Doe" # Contains number
}
response = requests.post(
f"{BASE_URL}/users-advanced",
json=user_data
)
print_test_result("❌ FULL NAME WITH NUMBER", response)
def test_tags_validation():
"""Test tags validation"""
user_data = {
"username": "testuser",
"email": "test@example.com",
"password": "SecurePass123",
"confirm_password": "SecurePass123",
"tags": ["valid-tag", "a", "Invalid Tag With Spaces"] # Multiple issues
}
response = requests.post(
f"{BASE_URL}/users-advanced",
json=user_data
)
print_test_result("❌ INVALID TAGS", response)
def test_validation_rules_endpoint():
"""Test the validation rules documentation endpoint"""
response = requests.get(f"{BASE_URL}/users-advanced/validation-rules")
print_test_result("📚 VALIDATION RULES DOCUMENTATION", response)
def test_validate_only():
"""Test validation without creating user"""
user_data = {
"username": "john_doe",
"email": "john@example.com",
"password": "SecurePass123",
"confirm_password": "SecurePass123"
}
response = requests.post(
f"{BASE_URL}/users-advanced/validate",
json=user_data
)
print_test_result("✅ VALIDATE ONLY (NO CREATE)", response)
def test_duplicate_user():
"""Test creating duplicate user"""
user_data = {
"username": "duplicate_test",
"email": "duplicate@example.com",
"password": "SecurePass123",
"confirm_password": "SecurePass123"
}
# Create first user
response1 = requests.post(
f"{BASE_URL}/users-advanced",
json=user_data
)
print_test_result("✅ CREATE FIRST USER", response1)
# Try to create duplicate
response2 = requests.post(
f"{BASE_URL}/users-advanced",
json=user_data
)
print_test_result("❌ DUPLICATE USER", response2)
def test_nonexistent_user():
"""Test getting non-existent user"""
response = requests.get(f"{BASE_URL}/users/99999")
print_test_result("❌ USER NOT FOUND", response)
def run_all_tests():
"""Run all validation tests"""
print("\n" + "🧪"*35)
print("ADVANCED VALIDATION TEST SUITE")
print("🧪"*35)
print("\n" + "="*70)
print("VALID DATA TESTS")
print("="*70)
test_valid_user()
print("\n" + "="*70)
print("USERNAME VALIDATION TESTS")
print("="*70)
test_invalid_username()
print("\n" + "="*70)
print("PASSWORD VALIDATION TESTS")
print("="*70)
test_invalid_password()
test_password_mismatch()
print("\n" + "="*70)
print("EMAIL VALIDATION TESTS")
print("="*70)
test_invalid_email()
print("\n" + "="*70)
print("AGE VALIDATION TESTS")
print("="*70)
test_invalid_age()
print("\n" + "="*70)
print("OTHER FIELD VALIDATION TESTS")
print("="*70)
test_full_name_validation()
test_tags_validation()
print("\n" + "="*70)
print("UTILITY ENDPOINTS")
print("="*70)
test_validation_rules_endpoint()
test_validate_only()
print("\n" + "="*70)
print("ERROR HANDLING TESTS")
print("="*70)
test_duplicate_user()
test_nonexistent_user()
print("\n" + "✅"*35)
print("ALL TESTS COMPLETED!")
print("✅"*35 + "\n")
if __name__ == "__main__":
print("""
╔════════════════════════════════════════════════════════╗
║ Advanced Validation Test Suite ║
║ ║
║ Make sure your server is running: ║
║ python main.py ║
╚════════════════════════════════════════════════════════╝
""")
try:
response = requests.get(f"{BASE_URL}/health")
if response.status_code == 200:
run_all_tests()
else:
print("❌ Server returned unexpected status code")
except requests.exceptions.ConnectionError:
print("❌ ERROR: Cannot connect to server!")
print(" Please start the server with: python main.py")
except Exception as e:
print(f"❌ ERROR: {e}")
🧪 Testing Your Advanced Validation
Step 12: Run the Tests
# Make sure your server is running
python main.py
# In another terminal (with venv activated)
python test_advanced_validation.py
What you’ll see:
- ✅ Valid user creation succeeds
- ❌ Invalid usernames rejected with specific errors
- ❌ Weak passwords caught and explained
- ❌ Password mismatch detected
- ❌ Invalid emails blocked
- ❌ Age restrictions enforced
- And much more!
📊 Key Concepts Summary
Validation Hierarchy
Client Request
↓
FastAPI receives JSON
↓
Pydantic parses and validates
├── Field validators (individual fields)
├── Model validators (cross-field)
└── Custom validation logic
↓
If validation fails → ValidationError → validation_exception_handler
↓
If business logic fails → AppException → app_exception_handler
↓
If unexpected error → Exception → generic_exception_handler
↓
Standardized error response to client
🎯 Real-World Applications
1. E-commerce Platform
class ProductCreate(BaseModel):
price: float = Field(..., gt=0)
discount: float = Field(default=0, ge=0, le=100)
@model_validator(mode='after')
def validate_final_price(self):
final_price = self.price * (1 - self.discount/100)
if final_price < 0.01:
raise ValueError('Final price too low')
return self
2. Social Media App
class PostCreate(BaseModel):
content: str = Field(..., max_length=280)
hashtags: List[str] = Field(default=[])
@field_validator('hashtags')
@classmethod
def validate_hashtags(cls, v):
# Max 5 hashtags
if len(v) > 5:
raise ValueError('Maximum 5 hashtags')
# Each starts with #
for tag in v:
if not tag.startswith('#'):
raise ValueError('Hashtags must start with #')
return [tag.lower() for tag in v]
3. Booking System
class ReservationCreate(BaseModel):
check_in: datetime
check_out: datetime
guests: int = Field(..., ge=1, le=10)
@model_validator(mode='after')
def validate_dates(self):
if self.check_in >= self.check_out:
raise ValueError('Check-out must be after check-in')
if self.check_in < datetime.now():
raise ValueError('Cannot book in the past')
if (self.check_out - self.check_in).days > 30:
raise ValueError('Maximum 30 day stay')
return self
🔧 Implementation Checklist
Files to Create/Update:
app/models/user_advanced.py– Advanced user modelsapp/core/exceptions.py– Custom exceptionsapp/models/error.py– Error response modelsapp/core/error_handlers.py– Exception handlersapp/main.py– Update with exception handlersapp/services/user_service.py– Update with custom exceptionsapp/api/v1/endpoints/users_advanced.py– Advanced endpointsapp/api/v1/api.py– Include new routertest_advanced_validation.py– Test script
Packages to Install:
pip install email-validator phonenumbers pydantic[email]
pip freeze > requirements.txt
What is the difference between @field_validator and @model_validator?
Why should I standardize my error responses in FastAPI?
How does pydantic[email] improve validation?
When should I use custom exception handlers?
Can I use Pydantic validators to transform data?
📚 Additional Resources
Pydantic Documentation:
FastAPI Documentation:
Regex Reference:
- Python re module
- Regex101 – Interactive regex tester
🎉 What You’ve Accomplished!
✅ Advanced Pydantic Validation
- Field validators for individual field rules
- Model validators for cross-field validation
- Computed properties for derived data
- Custom type transformations
✅ Professional Error Handling
- Custom exception hierarchy
- Standardized error responses
- Detailed validation errors
- Proper HTTP status codes
✅ Production-Ready Code
- Secure input validation
- Clear error messages
- Consistent API responses
- Easy to test and maintain
✅ Best Practices
- Separation of concerns
- DRY principle
- Security-first mindset
- User-friendly feedback
🚀 Coming Up in Blog Post 5
In the next tutorial, we’ll dive into:
- Dependency Injection: Advanced patterns and use cases
- Middleware: Request/response processing pipeline
- Logging: Structured logging with context
- Background Tasks: Async task processing
- Request Lifecycle: Understanding the full FastAPI request flow
This will complete our backend foundation before we move on to AI integration!
Leave a Reply