Database integration is the backbone of any production FastAPI application. In this tutorial, we will cover everything from basic SQLAlchemy setup to advanced patterns like the Repository pattern, async database access, connection pooling, and Alembic migrations. By the end, you will have built a complete e-commerce data layer with full CRUD operations, relationships, and migration support.
FastAPI is database-agnostic — it does not ship with a built-in ORM or database layer. This gives you the freedom to choose the best database and toolkit for your project. Here are the most common options:
SQLite is a file-based relational database that requires zero configuration. It is perfect for development, prototyping, and small applications. Python includes SQLite support in the standard library.
# SQLite connection string examples SQLITE_URL = "sqlite:///./app.db" # File-based SQLITE_URL_MEMORY = "sqlite:///:memory:" # In-memory (testing) SQLITE_URL_ABSOLUTE = "sqlite:////tmp/app.db" # Absolute path
Pros: Zero setup, no server needed, great for testing and development.
Cons: Not suitable for high-concurrency production workloads, limited to a single writer at a time.
PostgreSQL is the most popular choice for production FastAPI applications. It offers advanced features like JSONB columns, full-text search, array types, and excellent concurrency handling.
# PostgreSQL connection strings POSTGRES_URL = "postgresql://user:password@localhost:5432/mydb" POSTGRES_ASYNC_URL = "postgresql+asyncpg://user:password@localhost:5432/mydb" POSTGRES_PSYCOPG2 = "postgresql+psycopg2://user:password@localhost:5432/mydb"
Pros: Production-ready, advanced features, excellent ecosystem, strong ACID compliance.
Cons: Requires a running server, more complex setup than SQLite.
MySQL is another widely-used relational database with a large community and extensive tooling support.
# MySQL connection strings MYSQL_URL = "mysql+pymysql://user:password@localhost:3306/mydb" MYSQL_ASYNC_URL = "mysql+aiomysql://user:password@localhost:3306/mydb"
Pros: Widely supported, good performance, large community.
Cons: Fewer advanced features compared to PostgreSQL, some SQL standard deviations.
MongoDB is a document-oriented NoSQL database. While not used with SQLAlchemy, it works well with FastAPI through libraries like Motor (async) or PyMongo.
# MongoDB with Motor (async)
from motor.motor_asyncio import AsyncIOMotorClient
client = AsyncIOMotorClient("mongodb://localhost:27017")
db = client["mydb"]
collection = db["users"]
# Insert a document
await collection.insert_one({"name": "John", "email": "john@example.com"})
# Find documents
async for doc in collection.find({"name": "John"}):
print(doc)
Pros: Flexible schema, great for unstructured data, horizontal scaling.
Cons: No ACID transactions across documents (by default), less suited for relational data.
| Feature | SQLite | PostgreSQL | MySQL | MongoDB |
|---|---|---|---|---|
| Type | Relational (file) | Relational (server) | Relational (server) | Document (NoSQL) |
| Setup Complexity | None | Medium | Medium | Medium |
| Concurrency | Low | High | High | High |
| ACID Compliance | Yes | Yes | Yes (InnoDB) | Partial |
| Async Support | aiosqlite | asyncpg | aiomysql | Motor |
| Best For | Dev/Testing | Production APIs | Production APIs | Flexible schemas |
| SQLAlchemy Support | Full | Full | Full | No (use ODM) |
| JSON Support | Basic | JSONB (excellent) | JSON type | Native |
SQLAlchemy is the most popular ORM (Object-Relational Mapper) for Python. It provides a powerful and flexible way to interact with relational databases. FastAPI works seamlessly with SQLAlchemy through its dependency injection system.
# Core dependencies pip install fastapi uvicorn sqlalchemy # Database drivers pip install aiosqlite # SQLite async driver pip install psycopg2-binary # PostgreSQL sync driver pip install asyncpg # PostgreSQL async driver pip install pymysql # MySQL sync driver pip install aiomysql # MySQL async driver # Migration tool pip install alembic # Install all at once for this tutorial pip install fastapi uvicorn sqlalchemy alembic psycopg2-binary
Let us set up a well-organized project structure that separates concerns properly:
fastapi-database-tutorial/
├── app/
│ ├── __init__.py
│ ├── main.py # FastAPI application entry point
│ ├── config.py # Configuration settings
│ ├── database.py # Database engine, session, base
│ ├── models/
│ │ ├── __init__.py
│ │ ├── user.py # User model
│ │ ├── product.py # Product model
│ │ └── order.py # Order model
│ ├── schemas/
│ │ ├── __init__.py
│ │ ├── user.py # User Pydantic schemas
│ │ ├── product.py # Product Pydantic schemas
│ │ └── order.py # Order Pydantic schemas
│ ├── repositories/
│ │ ├── __init__.py
│ │ ├── base.py # Base repository
│ │ └── user.py # User repository
│ ├── routers/
│ │ ├── __init__.py
│ │ ├── users.py # User routes
│ │ └── products.py # Product routes
│ └── dependencies.py # Shared dependencies
├── alembic/
│ ├── versions/ # Migration files
│ ├── env.py # Alembic environment
│ └── script.py.mako # Migration template
├── alembic.ini # Alembic configuration
├── requirements.txt
└── tests/
├── __init__.py
└── test_users.py
First, create a configuration module that handles environment-specific settings:
# app/config.py
from pydantic_settings import BaseSettings
from functools import lru_cache
class Settings(BaseSettings):
"""Application settings loaded from environment variables."""
# Database
DATABASE_URL: str = "sqlite:///./app.db"
DATABASE_ECHO: bool = False # Log SQL queries
# Application
APP_NAME: str = "FastAPI Database Tutorial"
DEBUG: bool = True
# PostgreSQL production example:
# DATABASE_URL: str = "postgresql://user:pass@localhost:5432/mydb"
class Config:
env_file = ".env"
case_sensitive = True
@lru_cache
def get_settings() -> Settings:
"""Cache settings to avoid reading .env on every request."""
return Settings()
The database module is the foundation of your data layer. It creates the engine, configures sessions, and provides the declarative base for your models:
# app/database.py
from sqlalchemy import create_engine, event
from sqlalchemy.orm import sessionmaker, DeclarativeBase
from app.config import get_settings
settings = get_settings()
# Create the SQLAlchemy engine
# The engine manages a pool of database connections
engine = create_engine(
settings.DATABASE_URL,
echo=settings.DATABASE_ECHO, # Log SQL statements
# SQLite-specific: allow multi-threaded access
connect_args={"check_same_thread": False} if "sqlite" in settings.DATABASE_URL else {},
# Connection pool settings (for non-SQLite databases)
pool_size=10, # Number of persistent connections
max_overflow=20, # Additional connections when pool is full
pool_timeout=30, # Seconds to wait for a connection
pool_recycle=1800, # Recycle connections after 30 minutes
pool_pre_ping=True, # Verify connections before using them
)
# Enable foreign keys for SQLite (disabled by default)
if "sqlite" in settings.DATABASE_URL:
@event.listens_for(engine, "connect")
def set_sqlite_pragma(dbapi_connection, connection_record):
cursor = dbapi_connection.cursor()
cursor.execute("PRAGMA foreign_keys=ON")
cursor.close()
# Create a session factory
# Each request gets its own session
SessionLocal = sessionmaker(
autocommit=False, # We manage transactions explicitly
autoflush=False, # We flush manually for control
bind=engine,
)
# SQLAlchemy 2.0 style declarative base
class Base(DeclarativeBase):
"""Base class for all database models."""
pass
def create_tables():
"""Create all tables in the database.
Used for development; use Alembic migrations in production.
"""
Base.metadata.create_all(bind=engine)
check_same_thread=False argument is only needed for SQLite. FastAPI runs in multiple threads, and SQLite by default only allows access from the thread that created the connection. This setting disables that check. For PostgreSQL or MySQL, this argument is not needed.The SQLAlchemy engine is the starting point for all database operations. Here is what each configuration option does:
# Detailed engine configuration explained
from sqlalchemy import create_engine
engine = create_engine(
"postgresql://user:pass@localhost:5432/mydb",
# echo=True prints all SQL to stdout - useful for debugging
echo=True,
# pool_size: Number of connections to keep open in the pool
# Default is 5. Increase for high-traffic applications.
pool_size=10,
# max_overflow: Extra connections allowed beyond pool_size
# These are created on-demand and closed when returned to pool
max_overflow=20,
# pool_timeout: Seconds to wait for a connection from the pool
# Raises TimeoutError if no connection is available
pool_timeout=30,
# pool_recycle: Seconds before a connection is recycled
# Prevents "MySQL has gone away" type errors
pool_recycle=1800,
# pool_pre_ping: Test connections before using them
# Slightly slower but prevents using stale connections
pool_pre_ping=True,
# execution_options: Default options for all executions
execution_options={
"isolation_level": "REPEATABLE READ"
},
)
Database models define the structure of your tables. SQLAlchemy 2.0 uses Mapped type annotations for a more Pythonic and type-safe approach to defining columns and relationships.
# app/models/user.py
from datetime import datetime
from typing import Optional, List
from sqlalchemy import String, Integer, Boolean, DateTime, Text, func
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.database import Base
class User(Base):
"""User model representing the users table."""
__tablename__ = "users"
# Primary key - auto-incrementing integer
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
# Required fields
username: Mapped[str] = mapped_column(String(50), unique=True, nullable=False, index=True)
email: Mapped[str] = mapped_column(String(255), unique=True, nullable=False, index=True)
hashed_password: Mapped[str] = mapped_column(String(255), nullable=False)
# Optional fields
full_name: Mapped[Optional[str]] = mapped_column(String(100), nullable=True)
bio: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
is_active: Mapped[bool] = mapped_column(Boolean, default=True, nullable=False)
is_superuser: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False)
# Timestamps
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
nullable=False,
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True),
server_default=func.now(),
onupdate=func.now(),
nullable=False,
)
# Relationships
posts: Mapped[List["Post"]] = relationship(
"Post", back_populates="author", cascade="all, delete-orphan"
)
orders: Mapped[List["Order"]] = relationship(
"Order", back_populates="user", cascade="all, delete-orphan"
)
def __repr__(self) -> str:
return f"<User(id={self.id}, username='{self.username}', email='{self.email}')>"
SQLAlchemy provides a rich set of column types. Here are the most commonly used ones:
| SQLAlchemy Type | Python Type | SQL Type | Description |
|---|---|---|---|
| Integer | int | INTEGER | Standard integer |
| BigInteger | int | BIGINT | Large integer (IDs in big tables) |
| String(n) | str | VARCHAR(n) | Variable-length string with max length |
| Text | str | TEXT | Unlimited length text |
| Boolean | bool | BOOLEAN | True/False values |
| Float | float | FLOAT | Floating-point number |
| Numeric(p, s) | Decimal | NUMERIC | Exact decimal (use for money) |
| DateTime | datetime | DATETIME | Date and time |
| Date | date | DATE | Date only |
| Time | time | TIME | Time only |
| JSON | dict/list | JSON | JSON data |
| LargeBinary | bytes | BLOB | Binary data |
| Enum | enum.Enum | ENUM/VARCHAR | Enumerated values |
| UUID | uuid.UUID | UUID/CHAR(36) | Universally unique identifier |
One-to-many is the most common relationship type. One user can have many posts, but each post belongs to exactly one user.
# app/models/post.py
from datetime import datetime
from typing import Optional, List
from sqlalchemy import String, Integer, Text, DateTime, ForeignKey, func, Enum as SAEnum
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.database import Base
import enum
class PostStatus(str, enum.Enum):
DRAFT = "draft"
PUBLISHED = "published"
ARCHIVED = "archived"
class Post(Base):
"""Post model with a many-to-one relationship to User."""
__tablename__ = "posts"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
title: Mapped[str] = mapped_column(String(200), nullable=False, index=True)
slug: Mapped[str] = mapped_column(String(200), unique=True, nullable=False, index=True)
content: Mapped[str] = mapped_column(Text, nullable=False)
status: Mapped[PostStatus] = mapped_column(
SAEnum(PostStatus), default=PostStatus.DRAFT, nullable=False
)
view_count: Mapped[int] = mapped_column(Integer, default=0, nullable=False)
# Foreign key - links to users table
author_id: Mapped[int] = mapped_column(
Integer, ForeignKey("users.id", ondelete="CASCADE"), nullable=False, index=True
)
# Timestamps
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now()
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now(), onupdate=func.now()
)
published_at: Mapped[Optional[datetime]] = mapped_column(
DateTime(timezone=True), nullable=True
)
# Relationship back to User
author: Mapped["User"] = relationship("User", back_populates="posts")
# Many-to-many with tags
tags: Mapped[List["Tag"]] = relationship(
"Tag", secondary="post_tags", back_populates="posts"
)
def __repr__(self) -> str:
return f"<Post(id={self.id}, title='{self.title}', status='{self.status}')>"
Many-to-many relationships require an association table. A post can have many tags, and a tag can be applied to many posts.
# app/models/tag.py
from typing import List
from sqlalchemy import String, Integer, Table, Column, ForeignKey
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.database import Base
# Association table for many-to-many relationship
# This is a simple link table with no extra columns
post_tags = Table(
"post_tags",
Base.metadata,
Column("post_id", Integer, ForeignKey("posts.id", ondelete="CASCADE"), primary_key=True),
Column("tag_id", Integer, ForeignKey("tags.id", ondelete="CASCADE"), primary_key=True),
)
class Tag(Base):
"""Tag model with many-to-many relationship to Post."""
__tablename__ = "tags"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
name: Mapped[str] = mapped_column(String(50), unique=True, nullable=False, index=True)
slug: Mapped[str] = mapped_column(String(50), unique=True, nullable=False, index=True)
# Many-to-many relationship
posts: Mapped[List["Post"]] = relationship(
"Post", secondary=post_tags, back_populates="tags"
)
def __repr__(self) -> str:
return f"<Tag(id={self.id}, name='{self.name}')>"
When you need extra columns on the association table (like quantity or created date), use an association object pattern:
# app/models/order.py
from datetime import datetime
from typing import List
from sqlalchemy import (
String, Integer, Numeric, DateTime, ForeignKey, func, Enum as SAEnum
)
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.database import Base
import enum
class OrderStatus(str, enum.Enum):
PENDING = "pending"
CONFIRMED = "confirmed"
SHIPPED = "shipped"
DELIVERED = "delivered"
CANCELLED = "cancelled"
class Order(Base):
"""Order model."""
__tablename__ = "orders"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
order_number: Mapped[str] = mapped_column(String(50), unique=True, nullable=False)
status: Mapped[OrderStatus] = mapped_column(
SAEnum(OrderStatus), default=OrderStatus.PENDING
)
total_amount: Mapped[float] = mapped_column(Numeric(10, 2), default=0)
user_id: Mapped[int] = mapped_column(
Integer, ForeignKey("users.id", ondelete="CASCADE"), nullable=False
)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now()
)
# Relationships
user: Mapped["User"] = relationship("User", back_populates="orders")
items: Mapped[List["OrderItem"]] = relationship(
"OrderItem", back_populates="order", cascade="all, delete-orphan"
)
class OrderItem(Base):
"""Association object between Order and Product with extra data."""
__tablename__ = "order_items"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
# Composite foreign keys
order_id: Mapped[int] = mapped_column(
Integer, ForeignKey("orders.id", ondelete="CASCADE"), nullable=False
)
product_id: Mapped[int] = mapped_column(
Integer, ForeignKey("products.id", ondelete="RESTRICT"), nullable=False
)
# Extra data on the association
quantity: Mapped[int] = mapped_column(Integer, nullable=False, default=1)
unit_price: Mapped[float] = mapped_column(Numeric(10, 2), nullable=False)
total_price: Mapped[float] = mapped_column(Numeric(10, 2), nullable=False)
# Relationships
order: Mapped["Order"] = relationship("Order", back_populates="items")
product: Mapped["Product"] = relationship("Product")
Proper indexing is essential for query performance. Here is how to define various types of indexes and constraints:
# app/models/product.py
from datetime import datetime
from typing import Optional
from sqlalchemy import (
String, Integer, Numeric, Text, Boolean, DateTime,
ForeignKey, Index, UniqueConstraint, CheckConstraint, func
)
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.database import Base
class Product(Base):
"""Product model demonstrating various indexes and constraints."""
__tablename__ = "products"
# Table-level constraints and indexes
__table_args__ = (
# Composite unique constraint
UniqueConstraint("name", "category_id", name="uq_product_name_category"),
# Composite index for common queries
Index("ix_product_category_price", "category_id", "price"),
# Partial index (PostgreSQL only)
# Index("ix_active_products", "name", postgresql_where=text("is_active = true")),
# Check constraint
CheckConstraint("price >= 0", name="ck_product_price_positive"),
CheckConstraint("stock_quantity >= 0", name="ck_product_stock_positive"),
)
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
name: Mapped[str] = mapped_column(String(200), nullable=False, index=True)
slug: Mapped[str] = mapped_column(String(200), unique=True, nullable=False, index=True)
description: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
price: Mapped[float] = mapped_column(Numeric(10, 2), nullable=False)
stock_quantity: Mapped[int] = mapped_column(Integer, default=0, nullable=False)
sku: Mapped[str] = mapped_column(String(50), unique=True, nullable=False, index=True)
is_active: Mapped[bool] = mapped_column(Boolean, default=True, nullable=False)
# Foreign key to category
category_id: Mapped[Optional[int]] = mapped_column(
Integer, ForeignKey("categories.id", ondelete="SET NULL"), nullable=True
)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now()
)
updated_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now(), onupdate=func.now()
)
# Relationships
category: Mapped[Optional["Category"]] = relationship("Category", back_populates="products")
def __repr__(self) -> str:
return f"<Product(id={self.id}, name='{self.name}', price={self.price})>"
class Category(Base):
"""Category model with self-referential relationship for nested categories."""
__tablename__ = "categories"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
name: Mapped[str] = mapped_column(String(100), unique=True, nullable=False, index=True)
slug: Mapped[str] = mapped_column(String(100), unique=True, nullable=False)
description: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
# Self-referential foreign key for parent category
parent_id: Mapped[Optional[int]] = mapped_column(
Integer, ForeignKey("categories.id", ondelete="SET NULL"), nullable=True
)
# Self-referential relationship
parent: Mapped[Optional["Category"]] = relationship(
"Category", remote_side="Category.id", back_populates="children"
)
children: Mapped[list["Category"]] = relationship(
"Category", back_populates="parent"
)
# Products in this category
products: Mapped[list["Product"]] = relationship(
"Product", back_populates="category"
)
def __repr__(self) -> str:
return f"<Category(id={self.id}, name='{self.name}')>"
FastAPI’s dependency injection system is perfect for managing database sessions. Each request gets its own session that is automatically cleaned up when the request completes, regardless of whether it succeeded or raised an exception.
# app/dependencies.py
from typing import Generator, Annotated
from sqlalchemy.orm import Session
from fastapi import Depends
from app.database import SessionLocal
def get_db() -> Generator[Session, None, None]:
"""
Dependency that provides a database session per request.
Uses a generator with yield to ensure the session is always
closed after the request completes, even if an error occurs.
Usage:
@app.get("/users")
def get_users(db: Session = Depends(get_db)):
return db.query(User).all()
"""
db = SessionLocal()
try:
yield db
finally:
db.close()
# Type alias for cleaner route signatures
DbSession = Annotated[Session, Depends(get_db)]
For a more robust approach, you can create a dependency that automatically commits on success and rolls back on failure:
# app/dependencies.py (enhanced version)
from typing import Generator, Annotated
from sqlalchemy.orm import Session
from fastapi import Depends
from app.database import SessionLocal
import logging
logger = logging.getLogger(__name__)
def get_db() -> Generator[Session, None, None]:
"""
Provides a transactional database session.
- Commits automatically if no exceptions occur
- Rolls back automatically on exceptions
- Always closes the session when done
"""
db = SessionLocal()
try:
yield db
db.commit() # Commit if everything went well
except Exception:
db.rollback() # Rollback on any exception
raise
finally:
db.close()
# Reusable type annotation
DbSession = Annotated[Session, Depends(get_db)]
db.commit() explicitly in their route handlers or service layer. Choose the approach that fits your team’s conventions.
# app/routers/users.py
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from app.dependencies import DbSession
from app.models.user import User
from app.schemas.user import UserCreate, UserResponse
router = APIRouter(prefix="/users", tags=["users"])
# Method 1: Using the type alias (recommended)
@router.get("/", response_model=list[UserResponse])
def get_users(db: DbSession, skip: int = 0, limit: int = 100):
"""Get all users with pagination."""
users = db.query(User).offset(skip).limit(limit).all()
return users
# Method 2: Using Depends directly
@router.get("/{user_id}", response_model=UserResponse)
def get_user(user_id: int, db: Session = Depends(get_db)):
"""Get a single user by ID."""
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"User with id {user_id} not found"
)
return user
Understanding the session lifecycle is critical for avoiding common bugs:
# Session lifecycle visualization
"""
Request comes in
│
▼
get_db() creates SessionLocal() ← New session created
│
▼
yield db ← Session available to route handler
│
▼
Route handler executes ← Queries run on this session
│ Objects are tracked by session
▼
Request completes (or fails)
│
▼
finally: db.close() ← Session returned to pool
Objects become "detached"
"""
# Common mistake: accessing lazy-loaded relationships after session closes
@router.get("/users/{user_id}")
def get_user_bad(user_id: int, db: DbSession):
user = db.query(User).filter(User.id == user_id).first()
db.close() # DON'T DO THIS - let the dependency handle it!
# user.posts # This would fail! Session is closed.
return user
# Correct approach: let the dependency manage the session lifecycle
@router.get("/users/{user_id}")
def get_user_good(user_id: int, db: DbSession):
user = db.query(User).filter(User.id == user_id).first()
# Access relationships while session is still open
_ = user.posts # This works because session is still active
return user
CRUD (Create, Read, Update, Delete) operations form the core of any data-driven application. Let us implement each operation with proper error handling and best practices.
# app/routers/users.py
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from sqlalchemy.exc import IntegrityError
from app.dependencies import DbSession
from app.models.user import User
from app.schemas.user import UserCreate, UserResponse
import hashlib
router = APIRouter(prefix="/users", tags=["users"])
@router.post("/", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
def create_user(user_data: UserCreate, db: DbSession):
"""
Create a new user.
Steps:
1. Validate input (handled by Pydantic)
2. Check for existing user
3. Create model instance
4. Add to session
5. Commit transaction
6. Refresh to get generated values (id, timestamps)
"""
# Check if user already exists
existing_user = db.query(User).filter(
(User.email == user_data.email) | (User.username == user_data.username)
).first()
if existing_user:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="User with this email or username already exists"
)
# Create new user instance
db_user = User(
username=user_data.username,
email=user_data.email,
hashed_password=hashlib.sha256(user_data.password.encode()).hexdigest(),
full_name=user_data.full_name,
)
try:
db.add(db_user) # Add to session (pending state)
db.commit() # Write to database
db.refresh(db_user) # Reload from DB to get generated values
return db_user
except IntegrityError:
db.rollback()
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="User with this email or username already exists"
)
@router.post("/bulk", response_model=list[UserResponse], status_code=status.HTTP_201_CREATED)
def create_users_bulk(users_data: list[UserCreate], db: DbSession):
"""Create multiple users in a single transaction."""
db_users = []
for user_data in users_data:
db_user = User(
username=user_data.username,
email=user_data.email,
hashed_password=hashlib.sha256(user_data.password.encode()).hexdigest(),
full_name=user_data.full_name,
)
db_users.append(db_user)
try:
db.add_all(db_users) # Add all at once
db.commit()
for user in db_users:
db.refresh(user)
return db_users
except IntegrityError:
db.rollback()
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="One or more users already exist"
)
# app/routers/users.py (continued)
from typing import Optional
from fastapi import Query
@router.get("/", response_model=list[UserResponse])
def get_users(
db: DbSession,
skip: int = Query(0, ge=0, description="Number of records to skip"),
limit: int = Query(100, ge=1, le=1000, description="Max records to return"),
is_active: Optional[bool] = Query(None, description="Filter by active status"),
search: Optional[str] = Query(None, description="Search by username or email"),
):
"""
Get all users with filtering, search, and pagination.
"""
query = db.query(User)
# Apply filters
if is_active is not None:
query = query.filter(User.is_active == is_active)
if search:
search_pattern = f"%{search}%"
query = query.filter(
(User.username.ilike(search_pattern)) |
(User.email.ilike(search_pattern)) |
(User.full_name.ilike(search_pattern))
)
# Apply pagination
total = query.count()
users = query.order_by(User.created_at.desc()).offset(skip).limit(limit).all()
return users
@router.get("/{user_id}", response_model=UserResponse)
def get_user(user_id: int, db: DbSession):
"""Get a single user by ID."""
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"User with id {user_id} not found"
)
return user
@router.get("/by-email/{email}", response_model=UserResponse)
def get_user_by_email(email: str, db: DbSession):
"""Get a user by email address."""
user = db.query(User).filter(User.email == email).first()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"User with email {email} not found"
)
return user
# Advanced query examples
@router.get("/stats/active-count")
def get_active_user_count(db: DbSession):
"""Get count of active users."""
from sqlalchemy import func
result = db.query(func.count(User.id)).filter(User.is_active == True).scalar()
return {"active_users": result}
# app/routers/users.py (continued)
from app.schemas.user import UserUpdate
@router.put("/{user_id}", response_model=UserResponse)
def update_user(user_id: int, user_data: UserUpdate, db: DbSession):
"""
Full update of a user (PUT - replaces all fields).
"""
db_user = db.query(User).filter(User.id == user_id).first()
if not db_user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"User with id {user_id} not found"
)
# Update all fields from the request body
update_data = user_data.model_dump(exclude_unset=False)
for field, value in update_data.items():
setattr(db_user, field, value)
try:
db.commit()
db.refresh(db_user)
return db_user
except IntegrityError:
db.rollback()
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Update would violate a unique constraint"
)
@router.patch("/{user_id}", response_model=UserResponse)
def partial_update_user(user_id: int, user_data: UserUpdate, db: DbSession):
"""
Partial update of a user (PATCH - only updates provided fields).
"""
db_user = db.query(User).filter(User.id == user_id).first()
if not db_user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"User with id {user_id} not found"
)
# Only update fields that were explicitly set
update_data = user_data.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(db_user, field, value)
try:
db.commit()
db.refresh(db_user)
return db_user
except IntegrityError:
db.rollback()
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Update would violate a unique constraint"
)
# Bulk update example
@router.patch("/bulk/deactivate")
def deactivate_inactive_users(days: int, db: DbSession):
"""Deactivate users who haven't logged in for N days."""
from datetime import datetime, timedelta
cutoff_date = datetime.utcnow() - timedelta(days=days)
updated_count = (
db.query(User)
.filter(User.is_active == True, User.updated_at < cutoff_date)
.update({"is_active": False}, synchronize_session="fetch")
)
db.commit()
return {"deactivated_count": updated_count}
# app/routers/users.py (continued)
@router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_user(user_id: int, db: DbSession):
"""
Delete a user by ID.
With cascade="all, delete-orphan" on relationships,
related records (posts, orders) are also deleted.
"""
db_user = db.query(User).filter(User.id == user_id).first()
if not db_user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"User with id {user_id} not found"
)
db.delete(db_user)
db.commit()
return None # 204 No Content
# Soft delete pattern (recommended for production)
@router.delete("/{user_id}/soft")
def soft_delete_user(user_id: int, db: DbSession):
"""
Soft delete - deactivate instead of removing from database.
Data can be recovered if needed.
"""
db_user = db.query(User).filter(User.id == user_id).first()
if not db_user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"User with id {user_id} not found"
)
db_user.is_active = False
db.commit()
return {"message": f"User {user_id} has been deactivated"}
# Bulk delete
@router.delete("/bulk/inactive")
def delete_inactive_users(db: DbSession):
"""Delete all inactive users."""
deleted_count = (
db.query(User)
.filter(User.is_active == False)
.delete(synchronize_session="fetch")
)
db.commit()
return {"deleted_count": deleted_count}
One of the most important patterns in FastAPI development is separating your Pydantic schemas (for API validation and serialization) from your SQLAlchemy models (for database interaction). This separation of concerns keeps your code clean, secure, and maintainable.
| Concern | Pydantic Schema | SQLAlchemy Model |
|---|---|---|
| Purpose | API validation & serialization | Database table representation |
| Passwords | Accepts plain text, never returns it | Stores hashed version only |
| Computed Fields | Can include calculated values | Only stores raw data |
| Relationships | Controls nesting depth | Defines all relationships |
| Validation | Input validation rules | Database constraints |
| Versioning | Can have v1, v2 schemas | Single source of truth |
# app/schemas/user.py
from datetime import datetime
from typing import Optional, List
from pydantic import BaseModel, EmailStr, Field, ConfigDict
# ─── Base Schema (shared fields) ───────────────────────────────
class UserBase(BaseModel):
"""Fields shared across all user schemas."""
username: str = Field(..., min_length=3, max_length=50, pattern="^[a-zA-Z0-9_]+$")
email: EmailStr
full_name: Optional[str] = Field(None, max_length=100)
# ─── Create Schema (input for creating) ────────────────────────
class UserCreate(UserBase):
"""Schema for creating a new user. Includes password."""
password: str = Field(..., min_length=8, max_length=100)
# Example of custom validation
@classmethod
def validate_password_strength(cls, v):
if not any(c.isupper() for c in v):
raise ValueError("Password must contain at least one uppercase letter")
if not any(c.isdigit() for c in v):
raise ValueError("Password must contain at least one digit")
return v
# ─── Update Schema (input for updating) ────────────────────────
class UserUpdate(BaseModel):
"""Schema for updating a user. All fields optional for PATCH."""
username: Optional[str] = Field(None, min_length=3, max_length=50)
email: Optional[EmailStr] = None
full_name: Optional[str] = Field(None, max_length=100)
bio: Optional[str] = Field(None, max_length=1000)
is_active: Optional[bool] = None
# ─── Response Schema (output) ──────────────────────────────────
class UserResponse(UserBase):
"""
Schema for returning user data in API responses.
Note: hashed_password is NOT included - never expose it!
"""
id: int
is_active: bool
created_at: datetime
updated_at: datetime
# This tells Pydantic to read data from ORM model attributes
# In Pydantic v2, use model_config instead of class Config
model_config = ConfigDict(from_attributes=True)
# ─── Response with Relations ───────────────────────────────────
class UserWithPosts(UserResponse):
"""User response including their posts."""
posts: List["PostResponse"] = []
model_config = ConfigDict(from_attributes=True)
# ─── Minimal Response (for lists) ──────────────────────────────
class UserSummary(BaseModel):
"""Lightweight user representation for list endpoints."""
id: int
username: str
email: str
is_active: bool
model_config = ConfigDict(from_attributes=True)
# app/schemas/product.py
from datetime import datetime
from typing import Optional
from decimal import Decimal
from pydantic import BaseModel, Field, ConfigDict, field_validator
class ProductBase(BaseModel):
"""Shared product fields."""
name: str = Field(..., min_length=1, max_length=200)
description: Optional[str] = None
price: Decimal = Field(..., gt=0, decimal_places=2)
sku: str = Field(..., min_length=1, max_length=50)
category_id: Optional[int] = None
class ProductCreate(ProductBase):
"""Schema for creating a product."""
stock_quantity: int = Field(0, ge=0)
@field_validator("sku")
@classmethod
def validate_sku_format(cls, v: str) -> str:
"""SKU must be uppercase alphanumeric with hyphens."""
if not all(c.isalnum() or c == "-" for c in v):
raise ValueError("SKU must contain only letters, numbers, and hyphens")
return v.upper()
class ProductUpdate(BaseModel):
"""Schema for updating a product. All fields optional."""
name: Optional[str] = Field(None, min_length=1, max_length=200)
description: Optional[str] = None
price: Optional[Decimal] = Field(None, gt=0)
stock_quantity: Optional[int] = Field(None, ge=0)
is_active: Optional[bool] = None
category_id: Optional[int] = None
class ProductResponse(ProductBase):
"""Schema for product API response."""
id: int
slug: str
stock_quantity: int
is_active: bool
created_at: datetime
updated_at: datetime
model_config = ConfigDict(from_attributes=True)
class ProductWithCategory(ProductResponse):
"""Product with its category details."""
category: Optional["CategoryResponse"] = None
model_config = ConfigDict(from_attributes=True)
# Understanding from_attributes (Pydantic v2) / orm_mode (Pydantic v1)
# Without from_attributes, Pydantic expects dict-like data:
user_dict = {"id": 1, "username": "john", "email": "john@example.com"}
user_schema = UserResponse(**user_dict) # Works
# With from_attributes=True, Pydantic can read from ORM objects:
db_user = db.query(User).first() # SQLAlchemy model instance
user_schema = UserResponse.model_validate(db_user) # Works!
# This is what FastAPI does automatically when you set response_model:
@router.get("/{user_id}", response_model=UserResponse)
def get_user(user_id: int, db: DbSession):
user = db.query(User).first()
return user # FastAPI converts this using UserResponse.model_validate()
# Pydantic v1 (older) vs v2 (current):
# v1:
class UserResponseV1(BaseModel):
class Config:
orm_mode = True # Old syntax
# v2:
class UserResponseV2(BaseModel):
model_config = ConfigDict(from_attributes=True) # New syntax
# app/schemas/common.py
from typing import Generic, TypeVar, List, Optional
from pydantic import BaseModel
T = TypeVar("T")
class PaginatedResponse(BaseModel, Generic[T]):
"""Generic paginated response wrapper."""
items: List[T]
total: int
page: int
page_size: int
total_pages: int
has_next: bool
has_previous: bool
# Usage in route
@router.get("/", response_model=PaginatedResponse[UserResponse])
def get_users(
db: DbSession,
page: int = Query(1, ge=1),
page_size: int = Query(20, ge=1, le=100),
):
"""Get paginated list of users."""
query = db.query(User).filter(User.is_active == True)
total = query.count()
total_pages = (total + page_size - 1) // page_size
users = (
query
.order_by(User.created_at.desc())
.offset((page - 1) * page_size)
.limit(page_size)
.all()
)
return PaginatedResponse(
items=users,
total=total,
page=page,
page_size=page_size,
total_pages=total_pages,
has_next=page < total_pages,
has_previous=page > 1,
)
Alembic is the migration tool for SQLAlchemy. It tracks changes to your database models and generates migration scripts to apply those changes to your database schema. Think of it as “version control for your database.”
# Install Alembic pip install alembic # Initialize Alembic in your project cd fastapi-database-tutorial alembic init alembic
This creates the following structure:
alembic/ ├── versions/ # Migration scripts go here ├── env.py # Alembic environment configuration ├── README # Alembic README └── script.py.mako # Template for new migrations alembic.ini # Main Alembic configuration file
Update the Alembic configuration to use your SQLAlchemy models and database URL:
# alembic/env.py
from logging.config import fileConfig
from sqlalchemy import engine_from_config, pool
from alembic import context
# Import your models and Base
from app.database import Base
from app.config import get_settings
# Import ALL models so Alembic can detect them
from app.models.user import User
from app.models.post import Post
from app.models.tag import Tag
from app.models.product import Product, Category
from app.models.order import Order, OrderItem
# Alembic Config object
config = context.config
# Set the database URL from our app settings
settings = get_settings()
config.set_main_option("sqlalchemy.url", settings.DATABASE_URL)
# Interpret the config file for Python logging
if config.config_file_name is not None:
fileConfig(config.config_file_name)
# Set target metadata for auto-generation
target_metadata = Base.metadata
def run_migrations_offline() -> None:
"""Run migrations in 'offline' mode.
Generates SQL scripts without connecting to the database.
Useful for reviewing changes before applying them.
"""
url = config.get_main_option("sqlalchemy.url")
context.configure(
url=url,
target_metadata=target_metadata,
literal_binds=True,
dialect_opts={"paramstyle": "named"},
# Compare column types for detecting type changes
compare_type=True,
# Compare server defaults
compare_server_default=True,
)
with context.begin_transaction():
context.run_migrations()
def run_migrations_online() -> None:
"""Run migrations in 'online' mode.
Connects to the database and applies changes directly.
"""
connectable = engine_from_config(
config.get_section(config.config_ini_section, {}),
prefix="sqlalchemy.",
poolclass=pool.NullPool,
)
with connectable.connect() as connection:
context.configure(
connection=connection,
target_metadata=target_metadata,
compare_type=True,
compare_server_default=True,
)
with context.begin_transaction():
context.run_migrations()
if context.is_offline_mode():
run_migrations_offline()
else:
run_migrations_online()
# Auto-generate a migration by comparing models to database alembic revision --autogenerate -m "create users table" # Create an empty migration (for custom SQL) alembic revision -m "add custom index" # View current migration status alembic current # View migration history alembic history --verbose
# alembic/versions/001_create_users_table.py
"""create users table
Revision ID: a1b2c3d4e5f6
Revises:
Create Date: 2024-01-15 10:30:00.000000
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa
# revision identifiers
revision: str = "a1b2c3d4e5f6"
down_revision: Union[str, None] = None
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None
def upgrade() -> None:
"""Apply migration - create the users table."""
op.create_table(
"users",
sa.Column("id", sa.Integer(), autoincrement=True, nullable=False),
sa.Column("username", sa.String(length=50), nullable=False),
sa.Column("email", sa.String(length=255), nullable=False),
sa.Column("hashed_password", sa.String(length=255), nullable=False),
sa.Column("full_name", sa.String(length=100), nullable=True),
sa.Column("bio", sa.Text(), nullable=True),
sa.Column("is_active", sa.Boolean(), nullable=False, server_default="1"),
sa.Column("is_superuser", sa.Boolean(), nullable=False, server_default="0"),
sa.Column(
"created_at",
sa.DateTime(timezone=True),
server_default=sa.text("(CURRENT_TIMESTAMP)"),
nullable=False,
),
sa.Column(
"updated_at",
sa.DateTime(timezone=True),
server_default=sa.text("(CURRENT_TIMESTAMP)"),
nullable=False,
),
sa.PrimaryKeyConstraint("id"),
)
# Create indexes
op.create_index("ix_users_username", "users", ["username"], unique=True)
op.create_index("ix_users_email", "users", ["email"], unique=True)
def downgrade() -> None:
"""Reverse migration - drop the users table."""
op.drop_index("ix_users_email", table_name="users")
op.drop_index("ix_users_username", table_name="users")
op.drop_table("users")
# Apply all pending migrations alembic upgrade head # Apply migrations up to a specific revision alembic upgrade a1b2c3d4e5f6 # Apply the next migration only alembic upgrade +1 # Revert the last migration alembic downgrade -1 # Revert to a specific revision alembic downgrade a1b2c3d4e5f6 # Revert all migrations (back to empty database) alembic downgrade base # Generate SQL without executing (for review) alembic upgrade head --sql
# alembic/versions/003_seed_default_categories.py
"""seed default categories
Revision ID: c3d4e5f6g7h8
"""
from alembic import op
import sqlalchemy as sa
from datetime import datetime
revision = "c3d4e5f6g7h8"
down_revision = "b2c3d4e5f6g7"
def upgrade() -> None:
"""Insert default categories."""
categories_table = sa.table(
"categories",
sa.column("id", sa.Integer),
sa.column("name", sa.String),
sa.column("slug", sa.String),
sa.column("description", sa.Text),
)
op.bulk_insert(categories_table, [
{"id": 1, "name": "Electronics", "slug": "electronics",
"description": "Electronic devices and accessories"},
{"id": 2, "name": "Clothing", "slug": "clothing",
"description": "Apparel and fashion items"},
{"id": 3, "name": "Books", "slug": "books",
"description": "Physical and digital books"},
{"id": 4, "name": "Home & Garden", "slug": "home-garden",
"description": "Home improvement and garden supplies"},
])
def downgrade() -> None:
"""Remove default categories."""
op.execute("DELETE FROM categories WHERE id IN (1, 2, 3, 4)")
upgrade() and downgrade()"add email index to users" not "update"create_all() in production — always use Alembic migrationsFastAPI is built on ASGI and supports async/await natively. Using async database access can significantly improve throughput for I/O-bound applications by allowing the server to handle other requests while waiting for database responses.
# Install async drivers pip install sqlalchemy[asyncio] pip install aiosqlite # For SQLite pip install asyncpg # For PostgreSQL (recommended) pip install aiomysql # For MySQL
# app/database_async.py
from sqlalchemy.ext.asyncio import (
create_async_engine,
async_sessionmaker,
AsyncSession,
AsyncEngine,
)
from sqlalchemy.orm import DeclarativeBase
from app.config import get_settings
settings = get_settings()
# Async connection URLs use different drivers
# SQLite: sqlite+aiosqlite:///./app.db
# PostgreSQL: postgresql+asyncpg://user:pass@localhost:5432/mydb
# MySQL: mysql+aiomysql://user:pass@localhost:3306/mydb
ASYNC_DATABASE_URL = settings.DATABASE_URL.replace(
"sqlite://", "sqlite+aiosqlite://"
).replace(
"postgresql://", "postgresql+asyncpg://"
)
# Create async engine
async_engine: AsyncEngine = create_async_engine(
ASYNC_DATABASE_URL,
echo=settings.DATABASE_ECHO,
pool_size=10,
max_overflow=20,
pool_timeout=30,
pool_recycle=1800,
pool_pre_ping=True,
)
# Create async session factory
AsyncSessionLocal = async_sessionmaker(
bind=async_engine,
class_=AsyncSession,
expire_on_commit=False, # Important for async - prevents lazy loading issues
autocommit=False,
autoflush=False,
)
class Base(DeclarativeBase):
"""Base class for async models (same models work for both sync and async)."""
pass
async def create_tables():
"""Create all tables asynchronously."""
async with async_engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
async def drop_tables():
"""Drop all tables asynchronously (for testing)."""
async with async_engine.begin() as conn:
await conn.run_sync(Base.metadata.drop_all)
# app/dependencies_async.py
from typing import AsyncGenerator, Annotated
from sqlalchemy.ext.asyncio import AsyncSession
from fastapi import Depends
from app.database_async import AsyncSessionLocal
async def get_async_db() -> AsyncGenerator[AsyncSession, None]:
"""
Async dependency that provides a database session per request.
"""
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
# Type alias for cleaner signatures
AsyncDbSession = Annotated[AsyncSession, Depends(get_async_db)]
# app/routers/users_async.py
from fastapi import APIRouter, HTTPException, status
from sqlalchemy import select, func, update, delete
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.dependencies_async import AsyncDbSession
from app.models.user import User
from app.schemas.user import UserCreate, UserResponse, UserWithPosts
import hashlib
router = APIRouter(prefix="/async/users", tags=["users-async"])
@router.post("/", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def create_user(user_data: UserCreate, db: AsyncDbSession):
"""Create a new user asynchronously."""
# Check for existing user
stmt = select(User).where(
(User.email == user_data.email) | (User.username == user_data.username)
)
result = await db.execute(stmt)
existing = result.scalar_one_or_none()
if existing:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="User already exists"
)
db_user = User(
username=user_data.username,
email=user_data.email,
hashed_password=hashlib.sha256(user_data.password.encode()).hexdigest(),
full_name=user_data.full_name,
)
db.add(db_user)
await db.flush() # Write to DB but don't commit yet
await db.refresh(db_user) # Get generated values
return db_user
@router.get("/", response_model=list[UserResponse])
async def get_users(
db: AsyncDbSession,
skip: int = 0,
limit: int = 100,
):
"""Get all users asynchronously."""
stmt = (
select(User)
.where(User.is_active == True)
.order_by(User.created_at.desc())
.offset(skip)
.limit(limit)
)
result = await db.execute(stmt)
users = result.scalars().all()
return users
@router.get("/{user_id}", response_model=UserResponse)
async def get_user(user_id: int, db: AsyncDbSession):
"""Get a single user by ID."""
stmt = select(User).where(User.id == user_id)
result = await db.execute(stmt)
user = result.scalar_one_or_none()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"User with id {user_id} not found"
)
return user
@router.get("/{user_id}/with-posts", response_model=UserWithPosts)
async def get_user_with_posts(user_id: int, db: AsyncDbSession):
"""Get a user with their posts (eager loading)."""
stmt = (
select(User)
.options(selectinload(User.posts)) # Eager load posts
.where(User.id == user_id)
)
result = await db.execute(stmt)
user = result.scalar_one_or_none()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"User with id {user_id} not found"
)
return user
@router.put("/{user_id}", response_model=UserResponse)
async def update_user(user_id: int, user_data: dict, db: AsyncDbSession):
"""Update a user asynchronously."""
stmt = (
update(User)
.where(User.id == user_id)
.values(**user_data)
.returning(User) # PostgreSQL only - returns updated row
)
result = await db.execute(stmt)
user = result.scalar_one_or_none()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"User with id {user_id} not found"
)
await db.refresh(user)
return user
@router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_user(user_id: int, db: AsyncDbSession):
"""Delete a user asynchronously."""
stmt = select(User).where(User.id == user_id)
result = await db.execute(stmt)
user = result.scalar_one_or_none()
if not user:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"User with id {user_id} not found"
)
await db.delete(user)
| Scenario | Sync or Async? | Reason |
|---|---|---|
| High-traffic API | Async | Better concurrency under load |
| Simple CRUD app | Either | Sync is simpler, async adds complexity |
| Multiple DB calls per request | Async | Can run queries concurrently |
| CPU-heavy processing | Sync | Async does not help with CPU-bound work |
| External API calls + DB | Async | Can await both without blocking |
| Small team / prototype | Sync | Less complexity, faster development |
| Microservices at scale | Async | Better resource utilization |
async def, all database operations in that route should use async sessions. If your route is plain def, use sync sessions. FastAPI runs sync routes in a thread pool, so they do not block the event loop.Connection pooling is critical for production applications. Creating a new database connection for every request is expensive — a pool maintains a set of reusable connections that are shared across requests.
""" Connection Pool Lifecycle: 1. Application starts → Pool creates `pool_size` connections 2. Request arrives → Pool provides an available connection 3. Request completes → Connection is returned to pool (not closed) 4. Pool is full → New requests wait up to `pool_timeout` seconds 5. Pool overflow → Up to `max_overflow` temporary connections created 6. Connection stale → `pool_pre_ping` detects and replaces it 7. Connection old → `pool_recycle` replaces connections after N seconds Pool States: ┌─────────────────────────────────────────┐ │ Connection Pool │ │ │ │ ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐ │ │ │ Conn │ │ Conn │ │ Conn │ │ Conn │ │ ← pool_size=5 │ │ #1 │ │ #2 │ │ #3 │ │ #4 │ │ │ │(idle)│ │(busy)│ │(idle)│ │(busy)│ │ │ └──────┘ └──────┘ └──────┘ └──────┘ │ │ │ │ ┌──────┐ │ │ │ Conn │ ← max_overflow connections │ │ │ #5 │ (temporary, closed when │ │ │(busy)│ returned to pool) │ │ └──────┘ │ └─────────────────────────────────────────┘ """
# app/database.py - Production-ready pool configuration
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool, NullPool, StaticPool
# ─── Standard Configuration (most applications) ────────────────
engine = create_engine(
"postgresql://user:pass@localhost:5432/mydb",
# Pool implementation (QueuePool is default for non-SQLite)
poolclass=QueuePool,
# Number of persistent connections in the pool
# Rule of thumb: start with 5-10, increase based on load testing
pool_size=10,
# Additional connections allowed when pool is exhausted
# These connections are closed (not returned to pool) when done
max_overflow=20,
# Seconds to wait for a connection before raising TimeoutError
pool_timeout=30,
# Recycle connections after this many seconds
# Prevents issues with databases that close idle connections
# MySQL default wait_timeout is 28800 (8 hours)
pool_recycle=1800, # 30 minutes
# Test connections before using them
# Issues a SELECT 1 before handing out a connection
pool_pre_ping=True,
# Log pool events for debugging
echo_pool=True, # Set to False in production
)
# ─── Testing Configuration (SQLite in-memory) ──────────────────
test_engine = create_engine(
"sqlite:///:memory:",
poolclass=StaticPool, # Single connection for all threads
connect_args={"check_same_thread": False},
)
# ─── Serverless Configuration (no persistent pool) ─────────────
serverless_engine = create_engine(
"postgresql://user:pass@host:5432/mydb",
poolclass=NullPool, # No pooling - new connection every time
# Useful for AWS Lambda, Google Cloud Functions, etc.
)
# app/routers/health.py
from fastapi import APIRouter
from app.database import engine
router = APIRouter(tags=["health"])
@router.get("/health/db")
def database_health():
"""Check database connection pool health."""
pool = engine.pool
return {
"pool_size": pool.size(),
"checked_in": pool.checkedin(), # Available connections
"checked_out": pool.checkedout(), # In-use connections
"overflow": pool.overflow(), # Overflow connections in use
"total_connections": pool.checkedin() + pool.checkedout(),
"status": "healthy" if pool.checkedin() > 0 or pool.size() > pool.checkedout() else "stressed",
}
# Event listeners for monitoring
from sqlalchemy import event
@event.listens_for(engine, "checkout")
def on_checkout(dbapi_conn, connection_rec, connection_proxy):
"""Called when a connection is checked out from the pool."""
logger.debug(f"Connection checked out. Pool: {engine.pool.status()}")
@event.listens_for(engine, "checkin")
def on_checkin(dbapi_conn, connection_rec):
"""Called when a connection is returned to the pool."""
logger.debug(f"Connection checked in. Pool: {engine.pool.status()}")
@event.listens_for(engine, "connect")
def on_connect(dbapi_conn, connection_rec):
"""Called when a new raw connection is created."""
logger.info("New database connection created")
@event.listens_for(engine, "invalidate")
def on_invalidate(dbapi_conn, connection_rec, exception):
"""Called when a connection is invalidated."""
logger.warning(f"Connection invalidated: {exception}")
| Application Type | pool_size | max_overflow | Notes |
|---|---|---|---|
| Small API (dev) | 5 | 10 | Default settings are fine |
| Medium API | 10-20 | 20-30 | Monitor and adjust |
| High-traffic API | 20-50 | 50-100 | Use connection pooler (PgBouncer) |
| Serverless | N/A | N/A | Use NullPool or external pooler |
| Background workers | 2-5 | 5 | Fewer connections needed |
NullPool in SQLAlchemy and let PgBouncer handle all pooling.Efficiently loading related data is one of the most important skills when working with an ORM. SQLAlchemy provides several strategies for loading relationships, each with different performance characteristics.
The N+1 problem is the most common performance issue with ORMs. It occurs when you load a list of N records and then access a relationship on each one, causing N additional queries.
# THE N+1 PROBLEM - DO NOT DO THIS IN PRODUCTION
# Query 1: Get all users
users = db.query(User).all() # SELECT * FROM users → returns 100 users
# N queries: Access posts for each user (lazy loading)
for user in users:
print(f"{user.username} has {len(user.posts)} posts")
# Each access to user.posts triggers:
# SELECT * FROM posts WHERE author_id = 1
# SELECT * FROM posts WHERE author_id = 2
# SELECT * FROM posts WHERE author_id = 3
# ... 100 more queries!
# Total: 1 + 100 = 101 queries for 100 users!
joinedload uses a SQL JOIN to load related data in a single query. Best for one-to-one and many-to-one relationships.
from sqlalchemy.orm import joinedload
# Single query with JOIN - loads users and posts together
users = (
db.query(User)
.options(joinedload(User.posts))
.all()
)
# Generated SQL:
# SELECT users.*, posts.*
# FROM users
# LEFT OUTER JOIN posts ON users.id = posts.author_id
# Now accessing posts does NOT trigger additional queries
for user in users:
print(f"{user.username} has {len(user.posts)} posts") # No extra query!
# Total: 1 query!
selectinload uses a separate SELECT with an IN clause. Best for one-to-many and many-to-many relationships where JOIN would create duplicate rows.
from sqlalchemy.orm import selectinload
# Two queries: one for users, one for all their posts
users = (
db.query(User)
.options(selectinload(User.posts))
.all()
)
# Generated SQL:
# Query 1: SELECT * FROM users
# Query 2: SELECT * FROM posts WHERE posts.author_id IN (1, 2, 3, ..., 100)
# Total: 2 queries regardless of how many users!
# Nested eager loading
users = (
db.query(User)
.options(
selectinload(User.posts).selectinload(Post.tags),
selectinload(User.orders).selectinload(Order.items),
)
.all()
)
# Loads: users → posts → tags, users → orders → items
# Total: 5 queries (one per table)
| Strategy | SQL Queries | Best For | Trade-offs |
|---|---|---|---|
lazy="select" (default) |
N+1 | Rarely accessed relationships | Simple but can be very slow |
joinedload |
1 (JOIN) | Many-to-one, one-to-one | Can create large result sets with duplicates |
selectinload |
2 (SELECT + IN) | One-to-many, many-to-many | Best general-purpose eager loading |
subqueryload |
2 (SELECT + subquery) | Large collections | Similar to selectinload, uses subquery |
raiseload |
0 (raises error) | Preventing accidental lazy loads | Forces explicit loading strategy |
# app/routers/products.py
from sqlalchemy.orm import joinedload, selectinload
from sqlalchemy import select, func, and_, or_
@router.get("/products/with-categories", response_model=list[ProductWithCategory])
def get_products_with_categories(db: DbSession):
"""Get products with their category (many-to-one → joinedload)."""
products = (
db.query(Product)
.options(joinedload(Product.category))
.filter(Product.is_active == True)
.all()
)
return products
@router.get("/categories/{category_id}/products")
def get_category_with_products(category_id: int, db: DbSession):
"""Get a category with all its products (one-to-many → selectinload)."""
category = (
db.query(Category)
.options(selectinload(Category.products))
.filter(Category.id == category_id)
.first()
)
if not category:
raise HTTPException(status_code=404, detail="Category not found")
return {
"category": category.name,
"product_count": len(category.products),
"products": category.products,
}
@router.get("/orders/{order_id}/details")
def get_order_details(order_id: int, db: DbSession):
"""Get order with items and product details (nested eager loading)."""
order = (
db.query(Order)
.options(
joinedload(Order.user), # many-to-one: joinedload
selectinload(Order.items).joinedload(OrderItem.product), # nested
)
.filter(Order.id == order_id)
.first()
)
if not order:
raise HTTPException(status_code=404, detail="Order not found")
return order
# Explicit JOIN queries (for complex filters and aggregations)
@router.get("/products/by-category-stats")
def get_products_by_category_stats(db: DbSession):
"""Get product count and average price per category."""
results = (
db.query(
Category.name,
func.count(Product.id).label("product_count"),
func.avg(Product.price).label("avg_price"),
func.min(Product.price).label("min_price"),
func.max(Product.price).label("max_price"),
)
.join(Product, Category.id == Product.category_id)
.filter(Product.is_active == True)
.group_by(Category.name)
.order_by(func.count(Product.id).desc())
.all()
)
return [
{
"category": r.name,
"product_count": r.product_count,
"avg_price": float(r.avg_price or 0),
"min_price": float(r.min_price or 0),
"max_price": float(r.max_price or 0),
}
for r in results
]
@router.get("/users/top-spenders")
def get_top_spenders(db: DbSession, limit: int = 10):
"""Get users who have spent the most (multi-table join with aggregation)."""
results = (
db.query(
User.username,
User.email,
func.count(Order.id).label("order_count"),
func.sum(Order.total_amount).label("total_spent"),
)
.join(Order, User.id == Order.user_id)
.filter(Order.status != "cancelled")
.group_by(User.id, User.username, User.email)
.order_by(func.sum(Order.total_amount).desc())
.limit(limit)
.all()
)
return [
{
"username": r.username,
"email": r.email,
"order_count": r.order_count,
"total_spent": float(r.total_spent or 0),
}
for r in results
]
from sqlalchemy.orm import raiseload
# Strict mode: raise an error if any relationship is lazy-loaded
users = (
db.query(User)
.options(
selectinload(User.posts), # This will be loaded
raiseload(User.orders), # This will raise if accessed
raiseload("*"), # ALL other relationships will raise
)
.all()
)
# Accessing user.posts works fine (it was eagerly loaded)
for user in users:
print(user.posts) # OK
# Accessing user.orders raises an error instead of silently
# executing a query. This helps catch N+1 issues during development.
# print(user.orders) # Raises: sqlalchemy.exc.InvalidRequestError
Transactions ensure that a group of database operations either all succeed or all fail. Proper transaction management is essential for data integrity, especially when multiple related changes need to be atomic.
# SQLAlchemy sessions have implicit transaction management
@router.post("/orders/", response_model=OrderResponse)
def create_order(order_data: OrderCreate, db: DbSession):
"""
Create an order with items - all or nothing.
If any step fails, the entire operation is rolled back.
"""
# Step 1: Create the order
order = Order(
order_number=generate_order_number(),
user_id=order_data.user_id,
status=OrderStatus.PENDING,
)
db.add(order)
db.flush() # Get the order.id without committing
# Step 2: Create order items and calculate total
total = 0
for item_data in order_data.items:
# Verify product exists and has stock
product = db.query(Product).filter(Product.id == item_data.product_id).first()
if not product:
raise HTTPException(status_code=404, detail=f"Product {item_data.product_id} not found")
if product.stock_quantity < item_data.quantity:
raise HTTPException(
status_code=400,
detail=f"Insufficient stock for {product.name}"
)
# Create order item
item_total = product.price * item_data.quantity
order_item = OrderItem(
order_id=order.id,
product_id=product.id,
quantity=item_data.quantity,
unit_price=product.price,
total_price=item_total,
)
db.add(order_item)
total += item_total
# Step 3: Reduce stock
product.stock_quantity -= item_data.quantity
# Step 4: Update order total
order.total_amount = total
# Step 5: Commit everything at once
# If ANY step above fails, nothing is committed
db.commit()
db.refresh(order)
return order
from sqlalchemy.orm import Session
def transfer_funds(
db: Session,
from_account_id: int,
to_account_id: int,
amount: float,
) -> dict:
"""
Transfer funds between accounts with explicit transaction control.
"""
try:
# Begin a transaction (implicit with session)
from_account = db.query(Account).filter(Account.id == from_account_id).with_for_update().first()
to_account = db.query(Account).filter(Account.id == to_account_id).with_for_update().first()
if not from_account or not to_account:
raise ValueError("Account not found")
if from_account.balance < amount:
raise ValueError("Insufficient funds")
# Perform the transfer
from_account.balance -= amount
to_account.balance += amount
# Record the transaction
transaction = Transaction(
from_account_id=from_account_id,
to_account_id=to_account_id,
amount=amount,
status="completed",
)
db.add(transaction)
db.commit()
return {"status": "success", "transaction_id": transaction.id}
except Exception as e:
db.rollback()
# Log the error
logger.error(f"Transfer failed: {e}")
raise
@router.post("/orders/with-notifications")
def create_order_with_notification(order_data: OrderCreate, db: DbSession):
"""
Create an order and try to send a notification.
The order should be saved even if the notification fails.
"""
# Create the order (main transaction)
order = Order(
order_number=generate_order_number(),
user_id=order_data.user_id,
status=OrderStatus.PENDING,
total_amount=order_data.total,
)
db.add(order)
db.flush()
# Try to create a notification (nested transaction / savepoint)
try:
nested = db.begin_nested() # Creates a SAVEPOINT
notification = Notification(
user_id=order_data.user_id,
message=f"Order {order.order_number} confirmed!",
order_id=order.id,
)
db.add(notification)
nested.commit() # Release the savepoint
except Exception as e:
# Notification failed, but order is still intact
# The savepoint rollback only affects the notification
logger.warning(f"Failed to create notification: {e}")
# nested is already rolled back, main transaction continues
# Commit the main transaction (order is saved regardless)
db.commit()
db.refresh(order)
return order
# Pattern 1: Service layer with explicit transactions
class OrderService:
def __init__(self, db: Session):
self.db = db
def create_order(self, order_data: OrderCreate) -> Order:
"""Business logic with transaction management."""
try:
order = self._build_order(order_data)
self._validate_stock(order_data.items)
self._create_order_items(order, order_data.items)
self._update_stock(order_data.items)
self._calculate_total(order)
self.db.commit()
self.db.refresh(order)
return order
except Exception:
self.db.rollback()
raise
def _build_order(self, data: OrderCreate) -> Order:
order = Order(order_number=generate_order_number(), user_id=data.user_id)
self.db.add(order)
self.db.flush()
return order
# Pattern 2: Context manager for transactions
from contextlib import contextmanager
@contextmanager
def transaction(db: Session):
"""Context manager for explicit transaction boundaries."""
try:
yield db
db.commit()
except Exception:
db.rollback()
raise
# Usage:
def process_payment(db: Session, order_id: int, payment_data: dict):
with transaction(db):
order = db.query(Order).get(order_id)
order.status = OrderStatus.CONFIRMED
payment = Payment(order_id=order_id, **payment_data)
db.add(payment)
# Commits automatically on exit, rolls back on exception
with_for_update() for rows that need pessimistic locking (e.g., account balances)begin_nested()) when partial failures are acceptableIntegrityError for unique constraint violationsdb.flush() to get generated IDs without committing the transactionThe Repository pattern abstracts data access logic behind a clean interface. Instead of scattering SQLAlchemy queries throughout your route handlers, you centralize them in repository classes. This makes your code more testable, reusable, and easier to maintain.
| Without Repository | With Repository |
|---|---|
| Queries scattered in route handlers | Queries centralized in one place |
| Hard to test (need full FastAPI context) | Easy to test (mock the repository) |
| Duplicate query logic across routes | Reusable query methods |
| Routes know about SQLAlchemy internals | Routes only know the repository interface |
| Switching ORMs requires changing every route | Switching ORMs only changes repositories |
# app/repositories/base.py
from typing import Generic, TypeVar, Type, Optional, List, Any
from sqlalchemy.orm import Session
from sqlalchemy import select, func
from app.database import Base
# TypeVar for the model type
ModelType = TypeVar("ModelType", bound=Base)
class BaseRepository(Generic[ModelType]):
"""
Generic repository with common CRUD operations.
Subclass this for model-specific repositories.
"""
def __init__(self, model: Type[ModelType], db: Session):
self.model = model
self.db = db
def get(self, id: int) -> Optional[ModelType]:
"""Get a single record by ID."""
return self.db.query(self.model).filter(self.model.id == id).first()
def get_or_404(self, id: int) -> ModelType:
"""Get a single record by ID or raise HTTPException."""
from fastapi import HTTPException, status
obj = self.get(id)
if not obj:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail=f"{self.model.__name__} with id {id} not found",
)
return obj
def get_all(
self,
skip: int = 0,
limit: int = 100,
**filters: Any,
) -> List[ModelType]:
"""Get all records with optional filtering and pagination."""
query = self.db.query(self.model)
# Apply dynamic filters
for field, value in filters.items():
if value is not None and hasattr(self.model, field):
query = query.filter(getattr(self.model, field) == value)
return query.offset(skip).limit(limit).all()
def count(self, **filters: Any) -> int:
"""Count records with optional filtering."""
query = self.db.query(func.count(self.model.id))
for field, value in filters.items():
if value is not None and hasattr(self.model, field):
query = query.filter(getattr(self.model, field) == value)
return query.scalar()
def create(self, obj_data: dict) -> ModelType:
"""Create a new record."""
db_obj = self.model(**obj_data)
self.db.add(db_obj)
self.db.flush()
self.db.refresh(db_obj)
return db_obj
def update(self, id: int, obj_data: dict) -> Optional[ModelType]:
"""Update an existing record."""
db_obj = self.get(id)
if not db_obj:
return None
for field, value in obj_data.items():
if value is not None:
setattr(db_obj, field, value)
self.db.flush()
self.db.refresh(db_obj)
return db_obj
def delete(self, id: int) -> bool:
"""Delete a record by ID. Returns True if deleted."""
db_obj = self.get(id)
if not db_obj:
return False
self.db.delete(db_obj)
self.db.flush()
return True
def exists(self, **filters: Any) -> bool:
"""Check if a record exists with given filters."""
query = self.db.query(self.model.id)
for field, value in filters.items():
if hasattr(self.model, field):
query = query.filter(getattr(self.model, field) == value)
return query.first() is not None
# app/repositories/user.py
from typing import Optional, List
from sqlalchemy.orm import Session, selectinload
from sqlalchemy import or_
from app.repositories.base import BaseRepository
from app.models.user import User
import hashlib
class UserRepository(BaseRepository[User]):
"""Repository for User-specific database operations."""
def __init__(self, db: Session):
super().__init__(User, db)
def get_by_email(self, email: str) -> Optional[User]:
"""Find a user by email address."""
return self.db.query(User).filter(User.email == email).first()
def get_by_username(self, username: str) -> Optional[User]:
"""Find a user by username."""
return self.db.query(User).filter(User.username == username).first()
def search(self, query: str, skip: int = 0, limit: int = 20) -> List[User]:
"""Search users by username, email, or full name."""
pattern = f"%{query}%"
return (
self.db.query(User)
.filter(
or_(
User.username.ilike(pattern),
User.email.ilike(pattern),
User.full_name.ilike(pattern),
)
)
.offset(skip)
.limit(limit)
.all()
)
def get_active_users(self, skip: int = 0, limit: int = 100) -> List[User]:
"""Get only active users."""
return (
self.db.query(User)
.filter(User.is_active == True)
.order_by(User.created_at.desc())
.offset(skip)
.limit(limit)
.all()
)
def get_with_posts(self, user_id: int) -> Optional[User]:
"""Get a user with their posts eagerly loaded."""
return (
self.db.query(User)
.options(selectinload(User.posts))
.filter(User.id == user_id)
.first()
)
def create_user(self, username: str, email: str, password: str, **kwargs) -> User:
"""Create a new user with password hashing."""
user_data = {
"username": username,
"email": email,
"hashed_password": hashlib.sha256(password.encode()).hexdigest(),
**kwargs,
}
return self.create(user_data)
def deactivate(self, user_id: int) -> Optional[User]:
"""Soft-delete a user by deactivating their account."""
return self.update(user_id, {"is_active": False})
def email_exists(self, email: str) -> bool:
"""Check if an email is already registered."""
return self.exists(email=email)
# app/repositories/product.py
from typing import Optional, List
from decimal import Decimal
from sqlalchemy.orm import Session, joinedload
from sqlalchemy import and_
from app.repositories.base import BaseRepository
from app.models.product import Product
class ProductRepository(BaseRepository[Product]):
"""Repository for Product-specific database operations."""
def __init__(self, db: Session):
super().__init__(Product, db)
def get_by_sku(self, sku: str) -> Optional[Product]:
"""Find a product by SKU."""
return self.db.query(Product).filter(Product.sku == sku).first()
def get_by_slug(self, slug: str) -> Optional[Product]:
"""Find a product by URL slug."""
return self.db.query(Product).filter(Product.slug == slug).first()
def get_by_category(
self, category_id: int, skip: int = 0, limit: int = 50
) -> List[Product]:
"""Get all products in a category."""
return (
self.db.query(Product)
.filter(
and_(
Product.category_id == category_id,
Product.is_active == True,
)
)
.order_by(Product.name)
.offset(skip)
.limit(limit)
.all()
)
def get_in_price_range(
self, min_price: Decimal, max_price: Decimal
) -> List[Product]:
"""Get products within a price range."""
return (
self.db.query(Product)
.filter(
and_(
Product.price >= min_price,
Product.price <= max_price,
Product.is_active == True,
)
)
.order_by(Product.price)
.all()
)
def get_low_stock(self, threshold: int = 10) -> List[Product]:
"""Get products with stock below threshold."""
return (
self.db.query(Product)
.filter(
and_(
Product.stock_quantity <= threshold,
Product.is_active == True,
)
)
.order_by(Product.stock_quantity)
.all()
)
def update_stock(self, product_id: int, quantity_change: int) -> Optional[Product]:
"""Update product stock quantity (positive to add, negative to subtract)."""
product = self.get(product_id)
if not product:
return None
new_quantity = product.stock_quantity + quantity_change
if new_quantity < 0:
raise ValueError(f"Insufficient stock. Current: {product.stock_quantity}")
product.stock_quantity = new_quantity
self.db.flush()
self.db.refresh(product)
return product
def get_with_category(self, product_id: int) -> Optional[Product]:
"""Get a product with its category eagerly loaded."""
return (
self.db.query(Product)
.options(joinedload(Product.category))
.filter(Product.id == product_id)
.first()
)
# app/dependencies.py
from typing import Annotated
from fastapi import Depends
from sqlalchemy.orm import Session
from app.database import SessionLocal
from app.repositories.user import UserRepository
from app.repositories.product import ProductRepository
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
DbSession = Annotated[Session, Depends(get_db)]
# Repository dependencies
def get_user_repo(db: DbSession) -> UserRepository:
return UserRepository(db)
def get_product_repo(db: DbSession) -> ProductRepository:
return ProductRepository(db)
# Type aliases for clean route signatures
UserRepo = Annotated[UserRepository, Depends(get_user_repo)]
ProductRepo = Annotated[ProductRepository, Depends(get_product_repo)]
# app/routers/users.py
from fastapi import APIRouter, HTTPException, status
from app.dependencies import UserRepo, DbSession
from app.schemas.user import UserCreate, UserResponse, UserUpdate
router = APIRouter(prefix="/users", tags=["users"])
@router.post("/", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
def create_user(user_data: UserCreate, repo: UserRepo, db: DbSession):
"""Create a new user using the repository."""
# Check for duplicates
if repo.email_exists(user_data.email):
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Email already registered"
)
user = repo.create_user(
username=user_data.username,
email=user_data.email,
password=user_data.password,
full_name=user_data.full_name,
)
db.commit()
return user
@router.get("/", response_model=list[UserResponse])
def get_users(repo: UserRepo, skip: int = 0, limit: int = 100):
"""Get all active users."""
return repo.get_active_users(skip=skip, limit=limit)
@router.get("/search", response_model=list[UserResponse])
def search_users(q: str, repo: UserRepo):
"""Search users by name, email, or username."""
return repo.search(q)
@router.get("/{user_id}", response_model=UserResponse)
def get_user(user_id: int, repo: UserRepo):
"""Get a user by ID (raises 404 if not found)."""
return repo.get_or_404(user_id)
@router.patch("/{user_id}", response_model=UserResponse)
def update_user(user_id: int, user_data: UserUpdate, repo: UserRepo, db: DbSession):
"""Update a user."""
user = repo.update(user_id, user_data.model_dump(exclude_unset=True))
if not user:
raise HTTPException(status_code=404, detail="User not found")
db.commit()
return user
@router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_user(user_id: int, repo: UserRepo, db: DbSession):
"""Soft-delete a user."""
user = repo.deactivate(user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
db.commit()
Now let us bring everything together into a complete, production-ready e-commerce data layer. This project includes Products, Categories, Users, Orders, and full CRUD operations with relationships and proper architecture.
ecommerce/ ├── app/ │ ├── __init__.py │ ├── main.py │ ├── config.py │ ├── database.py │ ├── models/ │ │ ├── __init__.py # Import all models │ │ ├── user.py │ │ ├── product.py │ │ ├── category.py │ │ └── order.py │ ├── schemas/ │ │ ├── __init__.py │ │ ├── user.py │ │ ├── product.py │ │ ├── category.py │ │ └── order.py │ ├── repositories/ │ │ ├── __init__.py │ │ ├── base.py │ │ ├── user.py │ │ ├── product.py │ │ └── order.py │ ├── routers/ │ │ ├── __init__.py │ │ ├── users.py │ │ ├── products.py │ │ ├── categories.py │ │ └── orders.py │ └── dependencies.py ├── alembic/ │ ├── versions/ │ └── env.py ├── alembic.ini ├── requirements.txt └── .env
# app/main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
from app.database import create_tables, engine
from app.routers import users, products, categories, orders
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application startup and shutdown events."""
# Startup: create tables (use Alembic in production)
create_tables()
print("Database tables created successfully")
yield
# Shutdown: dispose engine connections
engine.dispose()
print("Database connections closed")
app = FastAPI(
title="E-Commerce API",
description="Complete e-commerce data layer with FastAPI and SQLAlchemy",
version="1.0.0",
lifespan=lifespan,
)
# Include routers
app.include_router(users.router)
app.include_router(products.router)
app.include_router(categories.router)
app.include_router(orders.router)
@app.get("/")
def root():
return {
"message": "E-Commerce API",
"docs": "/docs",
"version": "1.0.0",
}
@app.get("/health")
def health_check():
"""Health check endpoint."""
from app.database import engine
pool = engine.pool
return {
"status": "healthy",
"database": {
"pool_size": pool.size(),
"connections_in_use": pool.checkedout(),
"connections_available": pool.checkedin(),
},
}
# app/models/__init__.py """ Import all models here so Alembic can detect them for migrations. """ from app.models.user import User from app.models.category import Category from app.models.product import Product from app.models.order import Order, OrderItem __all__ = ["User", "Category", "Product", "Order", "OrderItem"]
# app/models/category.py (complete)
from datetime import datetime
from typing import Optional, List
from sqlalchemy import String, Integer, Text, DateTime, ForeignKey, func
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.database import Base
class Category(Base):
__tablename__ = "categories"
id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
name: Mapped[str] = mapped_column(String(100), unique=True, nullable=False, index=True)
slug: Mapped[str] = mapped_column(String(100), unique=True, nullable=False)
description: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
parent_id: Mapped[Optional[int]] = mapped_column(
Integer, ForeignKey("categories.id", ondelete="SET NULL"), nullable=True
)
created_at: Mapped[datetime] = mapped_column(
DateTime(timezone=True), server_default=func.now()
)
# Self-referential relationships
parent: Mapped[Optional["Category"]] = relationship(
"Category", remote_side="Category.id", back_populates="children"
)
children: Mapped[List["Category"]] = relationship(
"Category", back_populates="parent"
)
products: Mapped[List["Product"]] = relationship(
"Product", back_populates="category"
)
def __repr__(self) -> str:
return f"<Category(id={self.id}, name='{self.name}')>"
# app/schemas/order.py
from datetime import datetime
from typing import List, Optional
from decimal import Decimal
from pydantic import BaseModel, Field, ConfigDict
class OrderItemCreate(BaseModel):
"""Schema for adding an item to an order."""
product_id: int
quantity: int = Field(..., gt=0)
class OrderItemResponse(BaseModel):
"""Schema for order item in responses."""
id: int
product_id: int
quantity: int
unit_price: Decimal
total_price: Decimal
model_config = ConfigDict(from_attributes=True)
class OrderCreate(BaseModel):
"""Schema for creating a new order."""
user_id: int
items: List[OrderItemCreate] = Field(..., min_length=1)
shipping_address: Optional[str] = None
class OrderResponse(BaseModel):
"""Schema for order API responses."""
id: int
order_number: str
status: str
total_amount: Decimal
user_id: int
created_at: datetime
items: List[OrderItemResponse] = []
model_config = ConfigDict(from_attributes=True)
class OrderSummary(BaseModel):
"""Lightweight order for list endpoints."""
id: int
order_number: str
status: str
total_amount: Decimal
created_at: datetime
model_config = ConfigDict(from_attributes=True)
# app/repositories/order.py
from typing import Optional, List
from sqlalchemy.orm import Session, selectinload, joinedload
from sqlalchemy import func
from app.repositories.base import BaseRepository
from app.models.order import Order, OrderItem, OrderStatus
from app.models.product import Product
import uuid
from datetime import datetime
class OrderRepository(BaseRepository[Order]):
"""Repository for order operations."""
def __init__(self, db: Session):
super().__init__(Order, db)
def generate_order_number(self) -> str:
"""Generate a unique order number."""
timestamp = datetime.utcnow().strftime("%Y%m%d%H%M%S")
unique_id = uuid.uuid4().hex[:6].upper()
return f"ORD-{timestamp}-{unique_id}"
def get_with_items(self, order_id: int) -> Optional[Order]:
"""Get an order with its items and product details."""
return (
self.db.query(Order)
.options(
selectinload(Order.items).joinedload(OrderItem.product),
joinedload(Order.user),
)
.filter(Order.id == order_id)
.first()
)
def get_user_orders(
self, user_id: int, skip: int = 0, limit: int = 20
) -> List[Order]:
"""Get all orders for a specific user."""
return (
self.db.query(Order)
.filter(Order.user_id == user_id)
.order_by(Order.created_at.desc())
.offset(skip)
.limit(limit)
.all()
)
def create_order(self, user_id: int, items: list) -> Order:
"""
Create a complete order with items.
Validates stock, creates order items, updates stock quantities,
and calculates the total — all in a single transaction.
"""
# Create the order
order = Order(
order_number=self.generate_order_number(),
user_id=user_id,
status=OrderStatus.PENDING,
)
self.db.add(order)
self.db.flush() # Get order.id
total_amount = 0
for item_data in items:
# Fetch and validate product
product = (
self.db.query(Product)
.filter(Product.id == item_data.product_id)
.with_for_update() # Lock row to prevent race conditions
.first()
)
if not product:
raise ValueError(f"Product {item_data.product_id} not found")
if not product.is_active:
raise ValueError(f"Product '{product.name}' is not available")
if product.stock_quantity < item_data.quantity:
raise ValueError(
f"Insufficient stock for '{product.name}'. "
f"Available: {product.stock_quantity}, Requested: {item_data.quantity}"
)
# Calculate price
item_total = float(product.price) * item_data.quantity
# Create order item
order_item = OrderItem(
order_id=order.id,
product_id=product.id,
quantity=item_data.quantity,
unit_price=product.price,
total_price=item_total,
)
self.db.add(order_item)
# Update stock
product.stock_quantity -= item_data.quantity
total_amount += item_total
# Set order total
order.total_amount = total_amount
self.db.flush()
self.db.refresh(order)
return order
def update_status(self, order_id: int, new_status: OrderStatus) -> Optional[Order]:
"""Update order status with validation."""
order = self.get(order_id)
if not order:
return None
# Validate status transitions
valid_transitions = {
OrderStatus.PENDING: [OrderStatus.CONFIRMED, OrderStatus.CANCELLED],
OrderStatus.CONFIRMED: [OrderStatus.SHIPPED, OrderStatus.CANCELLED],
OrderStatus.SHIPPED: [OrderStatus.DELIVERED],
OrderStatus.DELIVERED: [],
OrderStatus.CANCELLED: [],
}
if new_status not in valid_transitions.get(order.status, []):
raise ValueError(
f"Cannot transition from {order.status} to {new_status}"
)
order.status = new_status
# If cancelled, restore stock
if new_status == OrderStatus.CANCELLED:
self._restore_stock(order)
self.db.flush()
self.db.refresh(order)
return order
def _restore_stock(self, order: Order):
"""Restore stock quantities when an order is cancelled."""
items = (
self.db.query(OrderItem)
.filter(OrderItem.order_id == order.id)
.all()
)
for item in items:
product = self.db.query(Product).get(item.product_id)
if product:
product.stock_quantity += item.quantity
def get_order_stats(self, user_id: Optional[int] = None) -> dict:
"""Get order statistics."""
query = self.db.query(Order)
if user_id:
query = query.filter(Order.user_id == user_id)
total_orders = query.count()
total_revenue = (
query.filter(Order.status != OrderStatus.CANCELLED)
.with_entities(func.sum(Order.total_amount))
.scalar()
) or 0
status_counts = (
query.with_entities(Order.status, func.count(Order.id))
.group_by(Order.status)
.all()
)
return {
"total_orders": total_orders,
"total_revenue": float(total_revenue),
"orders_by_status": {s.value: c for s, c in status_counts},
}
# app/routers/orders.py
from fastapi import APIRouter, HTTPException, status, Depends
from typing import Annotated
from app.dependencies import DbSession
from app.repositories.order import OrderRepository
from app.schemas.order import OrderCreate, OrderResponse, OrderSummary
from app.models.order import OrderStatus
router = APIRouter(prefix="/orders", tags=["orders"])
def get_order_repo(db: DbSession) -> OrderRepository:
return OrderRepository(db)
OrderRepo = Annotated[OrderRepository, Depends(get_order_repo)]
@router.post("/", response_model=OrderResponse, status_code=status.HTTP_201_CREATED)
def create_order(order_data: OrderCreate, repo: OrderRepo, db: DbSession):
"""Create a new order with items."""
try:
order = repo.create_order(
user_id=order_data.user_id,
items=order_data.items,
)
db.commit()
# Reload with relationships for response
order = repo.get_with_items(order.id)
return order
except ValueError as e:
db.rollback()
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail=str(e),
)
@router.get("/{order_id}", response_model=OrderResponse)
def get_order(order_id: int, repo: OrderRepo):
"""Get order details with items."""
order = repo.get_with_items(order_id)
if not order:
raise HTTPException(status_code=404, detail="Order not found")
return order
@router.get("/user/{user_id}", response_model=list[OrderSummary])
def get_user_orders(user_id: int, repo: OrderRepo, skip: int = 0, limit: int = 20):
"""Get all orders for a user."""
return repo.get_user_orders(user_id, skip=skip, limit=limit)
@router.patch("/{order_id}/status")
def update_order_status(
order_id: int,
new_status: OrderStatus,
repo: OrderRepo,
db: DbSession,
):
"""Update an order's status."""
try:
order = repo.update_status(order_id, new_status)
if not order:
raise HTTPException(status_code=404, detail="Order not found")
db.commit()
return {"order_id": order.id, "new_status": order.status}
except ValueError as e:
db.rollback()
raise HTTPException(status_code=400, detail=str(e))
@router.get("/stats/summary")
def get_order_stats(repo: OrderRepo):
"""Get order statistics."""
return repo.get_order_stats()
# Create requirements.txt cat > requirements.txt << EOF fastapi==0.109.0 uvicorn==0.27.0 sqlalchemy==2.0.25 alembic==1.13.1 pydantic==2.5.3 pydantic-settings==2.1.0 python-dotenv==1.0.0 email-validator==2.1.0 EOF # Install dependencies pip install -r requirements.txt # Create .env file cat > .env << EOF DATABASE_URL=sqlite:///./ecommerce.db DATABASE_ECHO=true DEBUG=true EOF # Initialize Alembic alembic init alembic # Generate initial migration alembic revision --autogenerate -m "initial schema" # Apply migration alembic upgrade head # Run the application uvicorn app.main:app --reload --host 0.0.0.0 --port 8000
# Create a category
curl -X POST http://localhost:8000/categories/ \
-H "Content-Type: application/json" \
-d '{"name": "Electronics", "slug": "electronics", "description": "Electronic devices"}'
# Create a product
curl -X POST http://localhost:8000/products/ \
-H "Content-Type: application/json" \
-d '{
"name": "Wireless Headphones",
"slug": "wireless-headphones",
"price": 79.99,
"sku": "WH-001",
"stock_quantity": 50,
"category_id": 1
}'
# Create a user
curl -X POST http://localhost:8000/users/ \
-H "Content-Type: application/json" \
-d '{
"username": "johndoe",
"email": "john@example.com",
"password": "SecurePass123",
"full_name": "John Doe"
}'
# Create an order
curl -X POST http://localhost:8000/orders/ \
-H "Content-Type: application/json" \
-d '{
"user_id": 1,
"items": [
{"product_id": 1, "quantity": 2}
]
}'
# Get order details
curl http://localhost:8000/orders/1
# Update order status
curl -X PATCH "http://localhost:8000/orders/1/status?new_status=confirmed"
# Get order statistics
curl http://localhost:8000/orders/stats/summary
# Check database health
curl http://localhost:8000/health
In this comprehensive tutorial, we covered every essential aspect of database integration in FastAPI. Here is a summary of the key concepts and recommendations:
| Topic | Key Points |
|---|---|
| Database Choice | Use SQLite for development, PostgreSQL for production. SQLAlchemy makes switching easy. |
| SQLAlchemy Setup | Use DeclarativeBase, configure the engine with pooling, and create a session factory. |
| Models | Use Mapped type annotations (SQLAlchemy 2.0). Define proper indexes and constraints. |
| Session Management | Use FastAPI’s dependency injection with yield for automatic cleanup. |
| CRUD Operations | Use db.flush() for intermediate IDs, db.commit() at transaction boundaries. |
| Schemas vs Models | Separate Pydantic schemas from SQLAlchemy models. Use from_attributes=True. |
| Alembic Migrations | Always use Alembic in production. Never use create_all() in production. |
| Async Database | Use async for high-traffic APIs. Stick with sync for simpler applications. |
| Connection Pooling | Configure pool_size, max_overflow, and pool_pre_ping for production. |
| Relationships | Use selectinload for collections, joinedload for single objects. Avoid N+1. |
| Transactions | Keep transactions short. Use savepoints for partial failure tolerance. |
| Repository Pattern | Abstract queries into repositories. Inject them via FastAPI dependencies. |