FastAPI – Database Integration

Database integration is the backbone of any production FastAPI application. In this tutorial, we will cover everything from basic SQLAlchemy setup to advanced patterns like the Repository pattern, async database access, connection pooling, and Alembic migrations. By the end, you will have built a complete e-commerce data layer with full CRUD operations, relationships, and migration support.

Prerequisites: You should be comfortable with FastAPI basics, Pydantic models, and dependency injection from the previous tutorials in this series. Basic SQL knowledge is also helpful.
Source Code: All code examples in this tutorial are available as a complete working project. Each section builds on the previous one, culminating in a full e-commerce data layer.

1. Database Options for FastAPI

FastAPI is database-agnostic — it does not ship with a built-in ORM or database layer. This gives you the freedom to choose the best database and toolkit for your project. Here are the most common options:

1.1 SQLite

SQLite is a file-based relational database that requires zero configuration. It is perfect for development, prototyping, and small applications. Python includes SQLite support in the standard library.

# SQLite connection string examples
SQLITE_URL = "sqlite:///./app.db"           # File-based
SQLITE_URL_MEMORY = "sqlite:///:memory:"    # In-memory (testing)
SQLITE_URL_ABSOLUTE = "sqlite:////tmp/app.db"  # Absolute path

Pros: Zero setup, no server needed, great for testing and development.
Cons: Not suitable for high-concurrency production workloads, limited to a single writer at a time.

1.2 PostgreSQL

PostgreSQL is the most popular choice for production FastAPI applications. It offers advanced features like JSONB columns, full-text search, array types, and excellent concurrency handling.

# PostgreSQL connection strings
POSTGRES_URL = "postgresql://user:password@localhost:5432/mydb"
POSTGRES_ASYNC_URL = "postgresql+asyncpg://user:password@localhost:5432/mydb"
POSTGRES_PSYCOPG2 = "postgresql+psycopg2://user:password@localhost:5432/mydb"

Pros: Production-ready, advanced features, excellent ecosystem, strong ACID compliance.
Cons: Requires a running server, more complex setup than SQLite.

1.3 MySQL

MySQL is another widely-used relational database with a large community and extensive tooling support.

# MySQL connection strings
MYSQL_URL = "mysql+pymysql://user:password@localhost:3306/mydb"
MYSQL_ASYNC_URL = "mysql+aiomysql://user:password@localhost:3306/mydb"

Pros: Widely supported, good performance, large community.
Cons: Fewer advanced features compared to PostgreSQL, some SQL standard deviations.

1.4 MongoDB

MongoDB is a document-oriented NoSQL database. While not used with SQLAlchemy, it works well with FastAPI through libraries like Motor (async) or PyMongo.

# MongoDB with Motor (async)
from motor.motor_asyncio import AsyncIOMotorClient

client = AsyncIOMotorClient("mongodb://localhost:27017")
db = client["mydb"]
collection = db["users"]

# Insert a document
await collection.insert_one({"name": "John", "email": "john@example.com"})

# Find documents
async for doc in collection.find({"name": "John"}):
    print(doc)

Pros: Flexible schema, great for unstructured data, horizontal scaling.
Cons: No ACID transactions across documents (by default), less suited for relational data.

1.5 Comparison Table

Feature SQLite PostgreSQL MySQL MongoDB
Type Relational (file) Relational (server) Relational (server) Document (NoSQL)
Setup Complexity None Medium Medium Medium
Concurrency Low High High High
ACID Compliance Yes Yes Yes (InnoDB) Partial
Async Support aiosqlite asyncpg aiomysql Motor
Best For Dev/Testing Production APIs Production APIs Flexible schemas
SQLAlchemy Support Full Full Full No (use ODM)
JSON Support Basic JSONB (excellent) JSON type Native
Recommendation: For this tutorial, we will use SQLAlchemy with SQLite for development and show PostgreSQL configurations for production. SQLAlchemy makes it easy to switch between databases by changing the connection string.

2. SQLAlchemy Setup

SQLAlchemy is the most popular ORM (Object-Relational Mapper) for Python. It provides a powerful and flexible way to interact with relational databases. FastAPI works seamlessly with SQLAlchemy through its dependency injection system.

2.1 Installing Dependencies

# Core dependencies
pip install fastapi uvicorn sqlalchemy

# Database drivers
pip install aiosqlite          # SQLite async driver
pip install psycopg2-binary    # PostgreSQL sync driver
pip install asyncpg             # PostgreSQL async driver
pip install pymysql             # MySQL sync driver
pip install aiomysql            # MySQL async driver

# Migration tool
pip install alembic

# Install all at once for this tutorial
pip install fastapi uvicorn sqlalchemy alembic psycopg2-binary

2.2 Project Structure

Let us set up a well-organized project structure that separates concerns properly:

fastapi-database-tutorial/
├── app/
│   ├── __init__.py
│   ├── main.py              # FastAPI application entry point
│   ├── config.py            # Configuration settings
│   ├── database.py          # Database engine, session, base
│   ├── models/
│   │   ├── __init__.py
│   │   ├── user.py          # User model
│   │   ├── product.py       # Product model
│   │   └── order.py         # Order model
│   ├── schemas/
│   │   ├── __init__.py
│   │   ├── user.py          # User Pydantic schemas
│   │   ├── product.py       # Product Pydantic schemas
│   │   └── order.py         # Order Pydantic schemas
│   ├── repositories/
│   │   ├── __init__.py
│   │   ├── base.py          # Base repository
│   │   └── user.py          # User repository
│   ├── routers/
│   │   ├── __init__.py
│   │   ├── users.py         # User routes
│   │   └── products.py      # Product routes
│   └── dependencies.py      # Shared dependencies
├── alembic/
│   ├── versions/            # Migration files
│   ├── env.py               # Alembic environment
│   └── script.py.mako       # Migration template
├── alembic.ini              # Alembic configuration
├── requirements.txt
└── tests/
    ├── __init__.py
    └── test_users.py

2.3 Database Configuration

First, create a configuration module that handles environment-specific settings:

# app/config.py
from pydantic_settings import BaseSettings
from functools import lru_cache


class Settings(BaseSettings):
    """Application settings loaded from environment variables."""

    # Database
    DATABASE_URL: str = "sqlite:///./app.db"
    DATABASE_ECHO: bool = False  # Log SQL queries

    # Application
    APP_NAME: str = "FastAPI Database Tutorial"
    DEBUG: bool = True

    # PostgreSQL production example:
    # DATABASE_URL: str = "postgresql://user:pass@localhost:5432/mydb"

    class Config:
        env_file = ".env"
        case_sensitive = True


@lru_cache
def get_settings() -> Settings:
    """Cache settings to avoid reading .env on every request."""
    return Settings()

2.4 Engine Creation and Session Management

The database module is the foundation of your data layer. It creates the engine, configures sessions, and provides the declarative base for your models:

# app/database.py
from sqlalchemy import create_engine, event
from sqlalchemy.orm import sessionmaker, DeclarativeBase
from app.config import get_settings

settings = get_settings()

# Create the SQLAlchemy engine
# The engine manages a pool of database connections
engine = create_engine(
    settings.DATABASE_URL,
    echo=settings.DATABASE_ECHO,  # Log SQL statements
    # SQLite-specific: allow multi-threaded access
    connect_args={"check_same_thread": False} if "sqlite" in settings.DATABASE_URL else {},
    # Connection pool settings (for non-SQLite databases)
    pool_size=10,         # Number of persistent connections
    max_overflow=20,      # Additional connections when pool is full
    pool_timeout=30,      # Seconds to wait for a connection
    pool_recycle=1800,    # Recycle connections after 30 minutes
    pool_pre_ping=True,   # Verify connections before using them
)

# Enable foreign keys for SQLite (disabled by default)
if "sqlite" in settings.DATABASE_URL:
    @event.listens_for(engine, "connect")
    def set_sqlite_pragma(dbapi_connection, connection_record):
        cursor = dbapi_connection.cursor()
        cursor.execute("PRAGMA foreign_keys=ON")
        cursor.close()

# Create a session factory
# Each request gets its own session
SessionLocal = sessionmaker(
    autocommit=False,   # We manage transactions explicitly
    autoflush=False,    # We flush manually for control
    bind=engine,
)


# SQLAlchemy 2.0 style declarative base
class Base(DeclarativeBase):
    """Base class for all database models."""
    pass


def create_tables():
    """Create all tables in the database.
    Used for development; use Alembic migrations in production.
    """
    Base.metadata.create_all(bind=engine)
Important: The check_same_thread=False argument is only needed for SQLite. FastAPI runs in multiple threads, and SQLite by default only allows access from the thread that created the connection. This setting disables that check. For PostgreSQL or MySQL, this argument is not needed.

2.5 Understanding the Engine

The SQLAlchemy engine is the starting point for all database operations. Here is what each configuration option does:

# Detailed engine configuration explained
from sqlalchemy import create_engine

engine = create_engine(
    "postgresql://user:pass@localhost:5432/mydb",

    # echo=True prints all SQL to stdout - useful for debugging
    echo=True,

    # pool_size: Number of connections to keep open in the pool
    # Default is 5. Increase for high-traffic applications.
    pool_size=10,

    # max_overflow: Extra connections allowed beyond pool_size
    # These are created on-demand and closed when returned to pool
    max_overflow=20,

    # pool_timeout: Seconds to wait for a connection from the pool
    # Raises TimeoutError if no connection is available
    pool_timeout=30,

    # pool_recycle: Seconds before a connection is recycled
    # Prevents "MySQL has gone away" type errors
    pool_recycle=1800,

    # pool_pre_ping: Test connections before using them
    # Slightly slower but prevents using stale connections
    pool_pre_ping=True,

    # execution_options: Default options for all executions
    execution_options={
        "isolation_level": "REPEATABLE READ"
    },
)

3. Database Models

Database models define the structure of your tables. SQLAlchemy 2.0 uses Mapped type annotations for a more Pythonic and type-safe approach to defining columns and relationships.

3.1 Defining a Basic Model

# app/models/user.py
from datetime import datetime
from typing import Optional, List
from sqlalchemy import String, Integer, Boolean, DateTime, Text, func
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.database import Base


class User(Base):
    """User model representing the users table."""

    __tablename__ = "users"

    # Primary key - auto-incrementing integer
    id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)

    # Required fields
    username: Mapped[str] = mapped_column(String(50), unique=True, nullable=False, index=True)
    email: Mapped[str] = mapped_column(String(255), unique=True, nullable=False, index=True)
    hashed_password: Mapped[str] = mapped_column(String(255), nullable=False)

    # Optional fields
    full_name: Mapped[Optional[str]] = mapped_column(String(100), nullable=True)
    bio: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
    is_active: Mapped[bool] = mapped_column(Boolean, default=True, nullable=False)
    is_superuser: Mapped[bool] = mapped_column(Boolean, default=False, nullable=False)

    # Timestamps
    created_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True),
        server_default=func.now(),
        nullable=False,
    )
    updated_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True),
        server_default=func.now(),
        onupdate=func.now(),
        nullable=False,
    )

    # Relationships
    posts: Mapped[List["Post"]] = relationship(
        "Post", back_populates="author", cascade="all, delete-orphan"
    )
    orders: Mapped[List["Order"]] = relationship(
        "Order", back_populates="user", cascade="all, delete-orphan"
    )

    def __repr__(self) -> str:
        return f"<User(id={self.id}, username='{self.username}', email='{self.email}')>"

3.2 Column Types Reference

SQLAlchemy provides a rich set of column types. Here are the most commonly used ones:

SQLAlchemy Type Python Type SQL Type Description
Integer int INTEGER Standard integer
BigInteger int BIGINT Large integer (IDs in big tables)
String(n) str VARCHAR(n) Variable-length string with max length
Text str TEXT Unlimited length text
Boolean bool BOOLEAN True/False values
Float float FLOAT Floating-point number
Numeric(p, s) Decimal NUMERIC Exact decimal (use for money)
DateTime datetime DATETIME Date and time
Date date DATE Date only
Time time TIME Time only
JSON dict/list JSON JSON data
LargeBinary bytes BLOB Binary data
Enum enum.Enum ENUM/VARCHAR Enumerated values
UUID uuid.UUID UUID/CHAR(36) Universally unique identifier

3.3 One-to-Many Relationships

One-to-many is the most common relationship type. One user can have many posts, but each post belongs to exactly one user.

# app/models/post.py
from datetime import datetime
from typing import Optional, List
from sqlalchemy import String, Integer, Text, DateTime, ForeignKey, func, Enum as SAEnum
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.database import Base
import enum


class PostStatus(str, enum.Enum):
    DRAFT = "draft"
    PUBLISHED = "published"
    ARCHIVED = "archived"


class Post(Base):
    """Post model with a many-to-one relationship to User."""

    __tablename__ = "posts"

    id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
    title: Mapped[str] = mapped_column(String(200), nullable=False, index=True)
    slug: Mapped[str] = mapped_column(String(200), unique=True, nullable=False, index=True)
    content: Mapped[str] = mapped_column(Text, nullable=False)
    status: Mapped[PostStatus] = mapped_column(
        SAEnum(PostStatus), default=PostStatus.DRAFT, nullable=False
    )
    view_count: Mapped[int] = mapped_column(Integer, default=0, nullable=False)

    # Foreign key - links to users table
    author_id: Mapped[int] = mapped_column(
        Integer, ForeignKey("users.id", ondelete="CASCADE"), nullable=False, index=True
    )

    # Timestamps
    created_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True), server_default=func.now()
    )
    updated_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True), server_default=func.now(), onupdate=func.now()
    )
    published_at: Mapped[Optional[datetime]] = mapped_column(
        DateTime(timezone=True), nullable=True
    )

    # Relationship back to User
    author: Mapped["User"] = relationship("User", back_populates="posts")

    # Many-to-many with tags
    tags: Mapped[List["Tag"]] = relationship(
        "Tag", secondary="post_tags", back_populates="posts"
    )

    def __repr__(self) -> str:
        return f"<Post(id={self.id}, title='{self.title}', status='{self.status}')>"

3.4 Many-to-Many Relationships

Many-to-many relationships require an association table. A post can have many tags, and a tag can be applied to many posts.

# app/models/tag.py
from typing import List
from sqlalchemy import String, Integer, Table, Column, ForeignKey
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.database import Base


# Association table for many-to-many relationship
# This is a simple link table with no extra columns
post_tags = Table(
    "post_tags",
    Base.metadata,
    Column("post_id", Integer, ForeignKey("posts.id", ondelete="CASCADE"), primary_key=True),
    Column("tag_id", Integer, ForeignKey("tags.id", ondelete="CASCADE"), primary_key=True),
)


class Tag(Base):
    """Tag model with many-to-many relationship to Post."""

    __tablename__ = "tags"

    id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
    name: Mapped[str] = mapped_column(String(50), unique=True, nullable=False, index=True)
    slug: Mapped[str] = mapped_column(String(50), unique=True, nullable=False, index=True)

    # Many-to-many relationship
    posts: Mapped[List["Post"]] = relationship(
        "Post", secondary=post_tags, back_populates="tags"
    )

    def __repr__(self) -> str:
        return f"<Tag(id={self.id}, name='{self.name}')>"

3.5 Many-to-Many with Extra Data (Association Object)

When you need extra columns on the association table (like quantity or created date), use an association object pattern:

# app/models/order.py
from datetime import datetime
from typing import List
from sqlalchemy import (
    String, Integer, Numeric, DateTime, ForeignKey, func, Enum as SAEnum
)
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.database import Base
import enum


class OrderStatus(str, enum.Enum):
    PENDING = "pending"
    CONFIRMED = "confirmed"
    SHIPPED = "shipped"
    DELIVERED = "delivered"
    CANCELLED = "cancelled"


class Order(Base):
    """Order model."""

    __tablename__ = "orders"

    id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
    order_number: Mapped[str] = mapped_column(String(50), unique=True, nullable=False)
    status: Mapped[OrderStatus] = mapped_column(
        SAEnum(OrderStatus), default=OrderStatus.PENDING
    )
    total_amount: Mapped[float] = mapped_column(Numeric(10, 2), default=0)

    user_id: Mapped[int] = mapped_column(
        Integer, ForeignKey("users.id", ondelete="CASCADE"), nullable=False
    )

    created_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True), server_default=func.now()
    )

    # Relationships
    user: Mapped["User"] = relationship("User", back_populates="orders")
    items: Mapped[List["OrderItem"]] = relationship(
        "OrderItem", back_populates="order", cascade="all, delete-orphan"
    )


class OrderItem(Base):
    """Association object between Order and Product with extra data."""

    __tablename__ = "order_items"

    id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)

    # Composite foreign keys
    order_id: Mapped[int] = mapped_column(
        Integer, ForeignKey("orders.id", ondelete="CASCADE"), nullable=False
    )
    product_id: Mapped[int] = mapped_column(
        Integer, ForeignKey("products.id", ondelete="RESTRICT"), nullable=False
    )

    # Extra data on the association
    quantity: Mapped[int] = mapped_column(Integer, nullable=False, default=1)
    unit_price: Mapped[float] = mapped_column(Numeric(10, 2), nullable=False)
    total_price: Mapped[float] = mapped_column(Numeric(10, 2), nullable=False)

    # Relationships
    order: Mapped["Order"] = relationship("Order", back_populates="items")
    product: Mapped["Product"] = relationship("Product")

3.6 Indexes and Constraints

Proper indexing is essential for query performance. Here is how to define various types of indexes and constraints:

# app/models/product.py
from datetime import datetime
from typing import Optional
from sqlalchemy import (
    String, Integer, Numeric, Text, Boolean, DateTime,
    ForeignKey, Index, UniqueConstraint, CheckConstraint, func
)
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.database import Base


class Product(Base):
    """Product model demonstrating various indexes and constraints."""

    __tablename__ = "products"

    # Table-level constraints and indexes
    __table_args__ = (
        # Composite unique constraint
        UniqueConstraint("name", "category_id", name="uq_product_name_category"),

        # Composite index for common queries
        Index("ix_product_category_price", "category_id", "price"),

        # Partial index (PostgreSQL only)
        # Index("ix_active_products", "name", postgresql_where=text("is_active = true")),

        # Check constraint
        CheckConstraint("price >= 0", name="ck_product_price_positive"),
        CheckConstraint("stock_quantity >= 0", name="ck_product_stock_positive"),
    )

    id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
    name: Mapped[str] = mapped_column(String(200), nullable=False, index=True)
    slug: Mapped[str] = mapped_column(String(200), unique=True, nullable=False, index=True)
    description: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
    price: Mapped[float] = mapped_column(Numeric(10, 2), nullable=False)
    stock_quantity: Mapped[int] = mapped_column(Integer, default=0, nullable=False)
    sku: Mapped[str] = mapped_column(String(50), unique=True, nullable=False, index=True)
    is_active: Mapped[bool] = mapped_column(Boolean, default=True, nullable=False)

    # Foreign key to category
    category_id: Mapped[Optional[int]] = mapped_column(
        Integer, ForeignKey("categories.id", ondelete="SET NULL"), nullable=True
    )

    created_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True), server_default=func.now()
    )
    updated_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True), server_default=func.now(), onupdate=func.now()
    )

    # Relationships
    category: Mapped[Optional["Category"]] = relationship("Category", back_populates="products")

    def __repr__(self) -> str:
        return f"<Product(id={self.id}, name='{self.name}', price={self.price})>"


class Category(Base):
    """Category model with self-referential relationship for nested categories."""

    __tablename__ = "categories"

    id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
    name: Mapped[str] = mapped_column(String(100), unique=True, nullable=False, index=True)
    slug: Mapped[str] = mapped_column(String(100), unique=True, nullable=False)
    description: Mapped[Optional[str]] = mapped_column(Text, nullable=True)

    # Self-referential foreign key for parent category
    parent_id: Mapped[Optional[int]] = mapped_column(
        Integer, ForeignKey("categories.id", ondelete="SET NULL"), nullable=True
    )

    # Self-referential relationship
    parent: Mapped[Optional["Category"]] = relationship(
        "Category", remote_side="Category.id", back_populates="children"
    )
    children: Mapped[list["Category"]] = relationship(
        "Category", back_populates="parent"
    )

    # Products in this category
    products: Mapped[list["Product"]] = relationship(
        "Product", back_populates="category"
    )

    def __repr__(self) -> str:
        return f"<Category(id={self.id}, name='{self.name}')>"

4. Database Session Dependency

FastAPI’s dependency injection system is perfect for managing database sessions. Each request gets its own session that is automatically cleaned up when the request completes, regardless of whether it succeeded or raised an exception.

4.1 Basic Session Dependency

# app/dependencies.py
from typing import Generator, Annotated
from sqlalchemy.orm import Session
from fastapi import Depends
from app.database import SessionLocal


def get_db() -> Generator[Session, None, None]:
    """
    Dependency that provides a database session per request.

    Uses a generator with yield to ensure the session is always
    closed after the request completes, even if an error occurs.

    Usage:
        @app.get("/users")
        def get_users(db: Session = Depends(get_db)):
            return db.query(User).all()
    """
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


# Type alias for cleaner route signatures
DbSession = Annotated[Session, Depends(get_db)]

4.2 Session with Automatic Commit/Rollback

For a more robust approach, you can create a dependency that automatically commits on success and rolls back on failure:

# app/dependencies.py (enhanced version)
from typing import Generator, Annotated
from sqlalchemy.orm import Session
from fastapi import Depends
from app.database import SessionLocal
import logging

logger = logging.getLogger(__name__)


def get_db() -> Generator[Session, None, None]:
    """
    Provides a transactional database session.

    - Commits automatically if no exceptions occur
    - Rolls back automatically on exceptions
    - Always closes the session when done
    """
    db = SessionLocal()
    try:
        yield db
        db.commit()  # Commit if everything went well
    except Exception:
        db.rollback()  # Rollback on any exception
        raise
    finally:
        db.close()


# Reusable type annotation
DbSession = Annotated[Session, Depends(get_db)]
Note: The auto-commit approach above is convenient but gives you less control. Many developers prefer to call db.commit() explicitly in their route handlers or service layer. Choose the approach that fits your team’s conventions.

4.3 Using the Dependency in Routes

# app/routers/users.py
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from app.dependencies import DbSession
from app.models.user import User
from app.schemas.user import UserCreate, UserResponse

router = APIRouter(prefix="/users", tags=["users"])


# Method 1: Using the type alias (recommended)
@router.get("/", response_model=list[UserResponse])
def get_users(db: DbSession, skip: int = 0, limit: int = 100):
    """Get all users with pagination."""
    users = db.query(User).offset(skip).limit(limit).all()
    return users


# Method 2: Using Depends directly
@router.get("/{user_id}", response_model=UserResponse)
def get_user(user_id: int, db: Session = Depends(get_db)):
    """Get a single user by ID."""
    user = db.query(User).filter(User.id == user_id).first()
    if not user:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f"User with id {user_id} not found"
        )
    return user

4.4 Request-Scoped Sessions Explained

Understanding the session lifecycle is critical for avoiding common bugs:

# Session lifecycle visualization
"""
Request comes in
    │
    ▼
get_db() creates SessionLocal()    ← New session created
    │
    ▼
yield db                           ← Session available to route handler
    │
    ▼
Route handler executes             ← Queries run on this session
    │                                 Objects are tracked by session
    ▼
Request completes (or fails)
    │
    ▼
finally: db.close()                ← Session returned to pool
                                     Objects become "detached"
"""

# Common mistake: accessing lazy-loaded relationships after session closes
@router.get("/users/{user_id}")
def get_user_bad(user_id: int, db: DbSession):
    user = db.query(User).filter(User.id == user_id).first()
    db.close()  # DON'T DO THIS - let the dependency handle it!
    # user.posts  # This would fail! Session is closed.
    return user


# Correct approach: let the dependency manage the session lifecycle
@router.get("/users/{user_id}")
def get_user_good(user_id: int, db: DbSession):
    user = db.query(User).filter(User.id == user_id).first()
    # Access relationships while session is still open
    _ = user.posts  # This works because session is still active
    return user

5. CRUD Operations with SQLAlchemy

CRUD (Create, Read, Update, Delete) operations form the core of any data-driven application. Let us implement each operation with proper error handling and best practices.

5.1 Create Operations

# app/routers/users.py
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.orm import Session
from sqlalchemy.exc import IntegrityError
from app.dependencies import DbSession
from app.models.user import User
from app.schemas.user import UserCreate, UserResponse
import hashlib

router = APIRouter(prefix="/users", tags=["users"])


@router.post("/", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
def create_user(user_data: UserCreate, db: DbSession):
    """
    Create a new user.

    Steps:
    1. Validate input (handled by Pydantic)
    2. Check for existing user
    3. Create model instance
    4. Add to session
    5. Commit transaction
    6. Refresh to get generated values (id, timestamps)
    """
    # Check if user already exists
    existing_user = db.query(User).filter(
        (User.email == user_data.email) | (User.username == user_data.username)
    ).first()

    if existing_user:
        raise HTTPException(
            status_code=status.HTTP_409_CONFLICT,
            detail="User with this email or username already exists"
        )

    # Create new user instance
    db_user = User(
        username=user_data.username,
        email=user_data.email,
        hashed_password=hashlib.sha256(user_data.password.encode()).hexdigest(),
        full_name=user_data.full_name,
    )

    try:
        db.add(db_user)       # Add to session (pending state)
        db.commit()           # Write to database
        db.refresh(db_user)   # Reload from DB to get generated values
        return db_user
    except IntegrityError:
        db.rollback()
        raise HTTPException(
            status_code=status.HTTP_409_CONFLICT,
            detail="User with this email or username already exists"
        )


@router.post("/bulk", response_model=list[UserResponse], status_code=status.HTTP_201_CREATED)
def create_users_bulk(users_data: list[UserCreate], db: DbSession):
    """Create multiple users in a single transaction."""
    db_users = []

    for user_data in users_data:
        db_user = User(
            username=user_data.username,
            email=user_data.email,
            hashed_password=hashlib.sha256(user_data.password.encode()).hexdigest(),
            full_name=user_data.full_name,
        )
        db_users.append(db_user)

    try:
        db.add_all(db_users)  # Add all at once
        db.commit()
        for user in db_users:
            db.refresh(user)
        return db_users
    except IntegrityError:
        db.rollback()
        raise HTTPException(
            status_code=status.HTTP_409_CONFLICT,
            detail="One or more users already exist"
        )

5.2 Read Operations

# app/routers/users.py (continued)
from typing import Optional
from fastapi import Query


@router.get("/", response_model=list[UserResponse])
def get_users(
    db: DbSession,
    skip: int = Query(0, ge=0, description="Number of records to skip"),
    limit: int = Query(100, ge=1, le=1000, description="Max records to return"),
    is_active: Optional[bool] = Query(None, description="Filter by active status"),
    search: Optional[str] = Query(None, description="Search by username or email"),
):
    """
    Get all users with filtering, search, and pagination.
    """
    query = db.query(User)

    # Apply filters
    if is_active is not None:
        query = query.filter(User.is_active == is_active)

    if search:
        search_pattern = f"%{search}%"
        query = query.filter(
            (User.username.ilike(search_pattern)) |
            (User.email.ilike(search_pattern)) |
            (User.full_name.ilike(search_pattern))
        )

    # Apply pagination
    total = query.count()
    users = query.order_by(User.created_at.desc()).offset(skip).limit(limit).all()

    return users


@router.get("/{user_id}", response_model=UserResponse)
def get_user(user_id: int, db: DbSession):
    """Get a single user by ID."""
    user = db.query(User).filter(User.id == user_id).first()
    if not user:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f"User with id {user_id} not found"
        )
    return user


@router.get("/by-email/{email}", response_model=UserResponse)
def get_user_by_email(email: str, db: DbSession):
    """Get a user by email address."""
    user = db.query(User).filter(User.email == email).first()
    if not user:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f"User with email {email} not found"
        )
    return user


# Advanced query examples
@router.get("/stats/active-count")
def get_active_user_count(db: DbSession):
    """Get count of active users."""
    from sqlalchemy import func

    result = db.query(func.count(User.id)).filter(User.is_active == True).scalar()
    return {"active_users": result}

5.3 Update Operations

# app/routers/users.py (continued)
from app.schemas.user import UserUpdate


@router.put("/{user_id}", response_model=UserResponse)
def update_user(user_id: int, user_data: UserUpdate, db: DbSession):
    """
    Full update of a user (PUT - replaces all fields).
    """
    db_user = db.query(User).filter(User.id == user_id).first()
    if not db_user:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f"User with id {user_id} not found"
        )

    # Update all fields from the request body
    update_data = user_data.model_dump(exclude_unset=False)
    for field, value in update_data.items():
        setattr(db_user, field, value)

    try:
        db.commit()
        db.refresh(db_user)
        return db_user
    except IntegrityError:
        db.rollback()
        raise HTTPException(
            status_code=status.HTTP_409_CONFLICT,
            detail="Update would violate a unique constraint"
        )


@router.patch("/{user_id}", response_model=UserResponse)
def partial_update_user(user_id: int, user_data: UserUpdate, db: DbSession):
    """
    Partial update of a user (PATCH - only updates provided fields).
    """
    db_user = db.query(User).filter(User.id == user_id).first()
    if not db_user:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f"User with id {user_id} not found"
        )

    # Only update fields that were explicitly set
    update_data = user_data.model_dump(exclude_unset=True)
    for field, value in update_data.items():
        setattr(db_user, field, value)

    try:
        db.commit()
        db.refresh(db_user)
        return db_user
    except IntegrityError:
        db.rollback()
        raise HTTPException(
            status_code=status.HTTP_409_CONFLICT,
            detail="Update would violate a unique constraint"
        )


# Bulk update example
@router.patch("/bulk/deactivate")
def deactivate_inactive_users(days: int, db: DbSession):
    """Deactivate users who haven't logged in for N days."""
    from datetime import datetime, timedelta

    cutoff_date = datetime.utcnow() - timedelta(days=days)

    updated_count = (
        db.query(User)
        .filter(User.is_active == True, User.updated_at < cutoff_date)
        .update({"is_active": False}, synchronize_session="fetch")
    )

    db.commit()
    return {"deactivated_count": updated_count}

5.4 Delete Operations

# app/routers/users.py (continued)


@router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_user(user_id: int, db: DbSession):
    """
    Delete a user by ID.

    With cascade="all, delete-orphan" on relationships,
    related records (posts, orders) are also deleted.
    """
    db_user = db.query(User).filter(User.id == user_id).first()
    if not db_user:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f"User with id {user_id} not found"
        )

    db.delete(db_user)
    db.commit()
    return None  # 204 No Content


# Soft delete pattern (recommended for production)
@router.delete("/{user_id}/soft")
def soft_delete_user(user_id: int, db: DbSession):
    """
    Soft delete - deactivate instead of removing from database.
    Data can be recovered if needed.
    """
    db_user = db.query(User).filter(User.id == user_id).first()
    if not db_user:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f"User with id {user_id} not found"
        )

    db_user.is_active = False
    db.commit()
    return {"message": f"User {user_id} has been deactivated"}


# Bulk delete
@router.delete("/bulk/inactive")
def delete_inactive_users(db: DbSession):
    """Delete all inactive users."""
    deleted_count = (
        db.query(User)
        .filter(User.is_active == False)
        .delete(synchronize_session="fetch")
    )
    db.commit()
    return {"deleted_count": deleted_count}

6. Pydantic Schemas vs DB Models

One of the most important patterns in FastAPI development is separating your Pydantic schemas (for API validation and serialization) from your SQLAlchemy models (for database interaction). This separation of concerns keeps your code clean, secure, and maintainable.

6.1 Why Separate Schemas from Models?

Concern Pydantic Schema SQLAlchemy Model
Purpose API validation & serialization Database table representation
Passwords Accepts plain text, never returns it Stores hashed version only
Computed Fields Can include calculated values Only stores raw data
Relationships Controls nesting depth Defines all relationships
Validation Input validation rules Database constraints
Versioning Can have v1, v2 schemas Single source of truth

6.2 Schema Organization

# app/schemas/user.py
from datetime import datetime
from typing import Optional, List
from pydantic import BaseModel, EmailStr, Field, ConfigDict


# ─── Base Schema (shared fields) ───────────────────────────────
class UserBase(BaseModel):
    """Fields shared across all user schemas."""
    username: str = Field(..., min_length=3, max_length=50, pattern="^[a-zA-Z0-9_]+$")
    email: EmailStr
    full_name: Optional[str] = Field(None, max_length=100)


# ─── Create Schema (input for creating) ────────────────────────
class UserCreate(UserBase):
    """Schema for creating a new user. Includes password."""
    password: str = Field(..., min_length=8, max_length=100)

    # Example of custom validation
    @classmethod
    def validate_password_strength(cls, v):
        if not any(c.isupper() for c in v):
            raise ValueError("Password must contain at least one uppercase letter")
        if not any(c.isdigit() for c in v):
            raise ValueError("Password must contain at least one digit")
        return v


# ─── Update Schema (input for updating) ────────────────────────
class UserUpdate(BaseModel):
    """Schema for updating a user. All fields optional for PATCH."""
    username: Optional[str] = Field(None, min_length=3, max_length=50)
    email: Optional[EmailStr] = None
    full_name: Optional[str] = Field(None, max_length=100)
    bio: Optional[str] = Field(None, max_length=1000)
    is_active: Optional[bool] = None


# ─── Response Schema (output) ──────────────────────────────────
class UserResponse(UserBase):
    """
    Schema for returning user data in API responses.

    Note: hashed_password is NOT included - never expose it!
    """
    id: int
    is_active: bool
    created_at: datetime
    updated_at: datetime

    # This tells Pydantic to read data from ORM model attributes
    # In Pydantic v2, use model_config instead of class Config
    model_config = ConfigDict(from_attributes=True)


# ─── Response with Relations ───────────────────────────────────
class UserWithPosts(UserResponse):
    """User response including their posts."""
    posts: List["PostResponse"] = []

    model_config = ConfigDict(from_attributes=True)


# ─── Minimal Response (for lists) ──────────────────────────────
class UserSummary(BaseModel):
    """Lightweight user representation for list endpoints."""
    id: int
    username: str
    email: str
    is_active: bool

    model_config = ConfigDict(from_attributes=True)

6.3 Product Schemas

# app/schemas/product.py
from datetime import datetime
from typing import Optional
from decimal import Decimal
from pydantic import BaseModel, Field, ConfigDict, field_validator


class ProductBase(BaseModel):
    """Shared product fields."""
    name: str = Field(..., min_length=1, max_length=200)
    description: Optional[str] = None
    price: Decimal = Field(..., gt=0, decimal_places=2)
    sku: str = Field(..., min_length=1, max_length=50)
    category_id: Optional[int] = None


class ProductCreate(ProductBase):
    """Schema for creating a product."""
    stock_quantity: int = Field(0, ge=0)

    @field_validator("sku")
    @classmethod
    def validate_sku_format(cls, v: str) -> str:
        """SKU must be uppercase alphanumeric with hyphens."""
        if not all(c.isalnum() or c == "-" for c in v):
            raise ValueError("SKU must contain only letters, numbers, and hyphens")
        return v.upper()


class ProductUpdate(BaseModel):
    """Schema for updating a product. All fields optional."""
    name: Optional[str] = Field(None, min_length=1, max_length=200)
    description: Optional[str] = None
    price: Optional[Decimal] = Field(None, gt=0)
    stock_quantity: Optional[int] = Field(None, ge=0)
    is_active: Optional[bool] = None
    category_id: Optional[int] = None


class ProductResponse(ProductBase):
    """Schema for product API response."""
    id: int
    slug: str
    stock_quantity: int
    is_active: bool
    created_at: datetime
    updated_at: datetime

    model_config = ConfigDict(from_attributes=True)


class ProductWithCategory(ProductResponse):
    """Product with its category details."""
    category: Optional["CategoryResponse"] = None

    model_config = ConfigDict(from_attributes=True)

6.4 The from_attributes Pattern (formerly orm_mode)

# Understanding from_attributes (Pydantic v2) / orm_mode (Pydantic v1)

# Without from_attributes, Pydantic expects dict-like data:
user_dict = {"id": 1, "username": "john", "email": "john@example.com"}
user_schema = UserResponse(**user_dict)  # Works

# With from_attributes=True, Pydantic can read from ORM objects:
db_user = db.query(User).first()  # SQLAlchemy model instance
user_schema = UserResponse.model_validate(db_user)  # Works!

# This is what FastAPI does automatically when you set response_model:
@router.get("/{user_id}", response_model=UserResponse)
def get_user(user_id: int, db: DbSession):
    user = db.query(User).first()
    return user  # FastAPI converts this using UserResponse.model_validate()

# Pydantic v1 (older) vs v2 (current):
# v1:
class UserResponseV1(BaseModel):
    class Config:
        orm_mode = True  # Old syntax

# v2:
class UserResponseV2(BaseModel):
    model_config = ConfigDict(from_attributes=True)  # New syntax

6.5 Paginated Response Schema

# app/schemas/common.py
from typing import Generic, TypeVar, List, Optional
from pydantic import BaseModel

T = TypeVar("T")


class PaginatedResponse(BaseModel, Generic[T]):
    """Generic paginated response wrapper."""
    items: List[T]
    total: int
    page: int
    page_size: int
    total_pages: int
    has_next: bool
    has_previous: bool


# Usage in route
@router.get("/", response_model=PaginatedResponse[UserResponse])
def get_users(
    db: DbSession,
    page: int = Query(1, ge=1),
    page_size: int = Query(20, ge=1, le=100),
):
    """Get paginated list of users."""
    query = db.query(User).filter(User.is_active == True)
    total = query.count()
    total_pages = (total + page_size - 1) // page_size

    users = (
        query
        .order_by(User.created_at.desc())
        .offset((page - 1) * page_size)
        .limit(page_size)
        .all()
    )

    return PaginatedResponse(
        items=users,
        total=total,
        page=page,
        page_size=page_size,
        total_pages=total_pages,
        has_next=page < total_pages,
        has_previous=page > 1,
    )

7. Alembic Migrations

Alembic is the migration tool for SQLAlchemy. It tracks changes to your database models and generates migration scripts to apply those changes to your database schema. Think of it as “version control for your database.”

7.1 Setting Up Alembic

# Install Alembic
pip install alembic

# Initialize Alembic in your project
cd fastapi-database-tutorial
alembic init alembic

This creates the following structure:

alembic/
├── versions/          # Migration scripts go here
├── env.py             # Alembic environment configuration
├── README             # Alembic README
└── script.py.mako     # Template for new migrations
alembic.ini            # Main Alembic configuration file

7.2 Configuring Alembic

Update the Alembic configuration to use your SQLAlchemy models and database URL:

# alembic/env.py
from logging.config import fileConfig
from sqlalchemy import engine_from_config, pool
from alembic import context

# Import your models and Base
from app.database import Base
from app.config import get_settings

# Import ALL models so Alembic can detect them
from app.models.user import User
from app.models.post import Post
from app.models.tag import Tag
from app.models.product import Product, Category
from app.models.order import Order, OrderItem

# Alembic Config object
config = context.config

# Set the database URL from our app settings
settings = get_settings()
config.set_main_option("sqlalchemy.url", settings.DATABASE_URL)

# Interpret the config file for Python logging
if config.config_file_name is not None:
    fileConfig(config.config_file_name)

# Set target metadata for auto-generation
target_metadata = Base.metadata


def run_migrations_offline() -> None:
    """Run migrations in 'offline' mode.

    Generates SQL scripts without connecting to the database.
    Useful for reviewing changes before applying them.
    """
    url = config.get_main_option("sqlalchemy.url")
    context.configure(
        url=url,
        target_metadata=target_metadata,
        literal_binds=True,
        dialect_opts={"paramstyle": "named"},
        # Compare column types for detecting type changes
        compare_type=True,
        # Compare server defaults
        compare_server_default=True,
    )

    with context.begin_transaction():
        context.run_migrations()


def run_migrations_online() -> None:
    """Run migrations in 'online' mode.

    Connects to the database and applies changes directly.
    """
    connectable = engine_from_config(
        config.get_section(config.config_ini_section, {}),
        prefix="sqlalchemy.",
        poolclass=pool.NullPool,
    )

    with connectable.connect() as connection:
        context.configure(
            connection=connection,
            target_metadata=target_metadata,
            compare_type=True,
            compare_server_default=True,
        )

        with context.begin_transaction():
            context.run_migrations()


if context.is_offline_mode():
    run_migrations_offline()
else:
    run_migrations_online()

7.3 Creating Migrations

# Auto-generate a migration by comparing models to database
alembic revision --autogenerate -m "create users table"

# Create an empty migration (for custom SQL)
alembic revision -m "add custom index"

# View current migration status
alembic current

# View migration history
alembic history --verbose

7.4 Auto-Generated Migration Example

# alembic/versions/001_create_users_table.py
"""create users table

Revision ID: a1b2c3d4e5f6
Revises:
Create Date: 2024-01-15 10:30:00.000000
"""
from typing import Sequence, Union
from alembic import op
import sqlalchemy as sa

# revision identifiers
revision: str = "a1b2c3d4e5f6"
down_revision: Union[str, None] = None
branch_labels: Union[str, Sequence[str], None] = None
depends_on: Union[str, Sequence[str], None] = None


def upgrade() -> None:
    """Apply migration - create the users table."""
    op.create_table(
        "users",
        sa.Column("id", sa.Integer(), autoincrement=True, nullable=False),
        sa.Column("username", sa.String(length=50), nullable=False),
        sa.Column("email", sa.String(length=255), nullable=False),
        sa.Column("hashed_password", sa.String(length=255), nullable=False),
        sa.Column("full_name", sa.String(length=100), nullable=True),
        sa.Column("bio", sa.Text(), nullable=True),
        sa.Column("is_active", sa.Boolean(), nullable=False, server_default="1"),
        sa.Column("is_superuser", sa.Boolean(), nullable=False, server_default="0"),
        sa.Column(
            "created_at",
            sa.DateTime(timezone=True),
            server_default=sa.text("(CURRENT_TIMESTAMP)"),
            nullable=False,
        ),
        sa.Column(
            "updated_at",
            sa.DateTime(timezone=True),
            server_default=sa.text("(CURRENT_TIMESTAMP)"),
            nullable=False,
        ),
        sa.PrimaryKeyConstraint("id"),
    )
    # Create indexes
    op.create_index("ix_users_username", "users", ["username"], unique=True)
    op.create_index("ix_users_email", "users", ["email"], unique=True)


def downgrade() -> None:
    """Reverse migration - drop the users table."""
    op.drop_index("ix_users_email", table_name="users")
    op.drop_index("ix_users_username", table_name="users")
    op.drop_table("users")

7.5 Applying and Reverting Migrations

# Apply all pending migrations
alembic upgrade head

# Apply migrations up to a specific revision
alembic upgrade a1b2c3d4e5f6

# Apply the next migration only
alembic upgrade +1

# Revert the last migration
alembic downgrade -1

# Revert to a specific revision
alembic downgrade a1b2c3d4e5f6

# Revert all migrations (back to empty database)
alembic downgrade base

# Generate SQL without executing (for review)
alembic upgrade head --sql

7.6 Data Migrations

# alembic/versions/003_seed_default_categories.py
"""seed default categories

Revision ID: c3d4e5f6g7h8
"""
from alembic import op
import sqlalchemy as sa
from datetime import datetime

revision = "c3d4e5f6g7h8"
down_revision = "b2c3d4e5f6g7"


def upgrade() -> None:
    """Insert default categories."""
    categories_table = sa.table(
        "categories",
        sa.column("id", sa.Integer),
        sa.column("name", sa.String),
        sa.column("slug", sa.String),
        sa.column("description", sa.Text),
    )

    op.bulk_insert(categories_table, [
        {"id": 1, "name": "Electronics", "slug": "electronics",
         "description": "Electronic devices and accessories"},
        {"id": 2, "name": "Clothing", "slug": "clothing",
         "description": "Apparel and fashion items"},
        {"id": 3, "name": "Books", "slug": "books",
         "description": "Physical and digital books"},
        {"id": 4, "name": "Home & Garden", "slug": "home-garden",
         "description": "Home improvement and garden supplies"},
    ])


def downgrade() -> None:
    """Remove default categories."""
    op.execute("DELETE FROM categories WHERE id IN (1, 2, 3, 4)")

7.7 Migration Best Practices

Migration Best Practices:

  • Never edit an applied migration — always create a new one to fix issues
  • Test migrations both ways — verify both upgrade() and downgrade()
  • Use descriptive messages"add email index to users" not "update"
  • Review auto-generated migrations — they are not always perfect
  • Include data migrations when schema changes affect existing data
  • Keep migrations small — one logical change per migration
  • Commit migrations with code changes — they belong together in version control
  • Never use create_all() in production — always use Alembic migrations

8. Async Database Access

FastAPI is built on ASGI and supports async/await natively. Using async database access can significantly improve throughput for I/O-bound applications by allowing the server to handle other requests while waiting for database responses.

8.1 SQLAlchemy 2.0 Async Setup

# Install async drivers
pip install sqlalchemy[asyncio]
pip install aiosqlite      # For SQLite
pip install asyncpg         # For PostgreSQL (recommended)
pip install aiomysql        # For MySQL
# app/database_async.py
from sqlalchemy.ext.asyncio import (
    create_async_engine,
    async_sessionmaker,
    AsyncSession,
    AsyncEngine,
)
from sqlalchemy.orm import DeclarativeBase
from app.config import get_settings

settings = get_settings()

# Async connection URLs use different drivers
# SQLite: sqlite+aiosqlite:///./app.db
# PostgreSQL: postgresql+asyncpg://user:pass@localhost:5432/mydb
# MySQL: mysql+aiomysql://user:pass@localhost:3306/mydb

ASYNC_DATABASE_URL = settings.DATABASE_URL.replace(
    "sqlite://", "sqlite+aiosqlite://"
).replace(
    "postgresql://", "postgresql+asyncpg://"
)

# Create async engine
async_engine: AsyncEngine = create_async_engine(
    ASYNC_DATABASE_URL,
    echo=settings.DATABASE_ECHO,
    pool_size=10,
    max_overflow=20,
    pool_timeout=30,
    pool_recycle=1800,
    pool_pre_ping=True,
)

# Create async session factory
AsyncSessionLocal = async_sessionmaker(
    bind=async_engine,
    class_=AsyncSession,
    expire_on_commit=False,  # Important for async - prevents lazy loading issues
    autocommit=False,
    autoflush=False,
)


class Base(DeclarativeBase):
    """Base class for async models (same models work for both sync and async)."""
    pass


async def create_tables():
    """Create all tables asynchronously."""
    async with async_engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)


async def drop_tables():
    """Drop all tables asynchronously (for testing)."""
    async with async_engine.begin() as conn:
        await conn.run_sync(Base.metadata.drop_all)

8.2 Async Session Dependency

# app/dependencies_async.py
from typing import AsyncGenerator, Annotated
from sqlalchemy.ext.asyncio import AsyncSession
from fastapi import Depends
from app.database_async import AsyncSessionLocal


async def get_async_db() -> AsyncGenerator[AsyncSession, None]:
    """
    Async dependency that provides a database session per request.
    """
    async with AsyncSessionLocal() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise


# Type alias for cleaner signatures
AsyncDbSession = Annotated[AsyncSession, Depends(get_async_db)]

8.3 Async CRUD Operations

# app/routers/users_async.py
from fastapi import APIRouter, HTTPException, status
from sqlalchemy import select, func, update, delete
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import selectinload
from app.dependencies_async import AsyncDbSession
from app.models.user import User
from app.schemas.user import UserCreate, UserResponse, UserWithPosts
import hashlib

router = APIRouter(prefix="/async/users", tags=["users-async"])


@router.post("/", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
async def create_user(user_data: UserCreate, db: AsyncDbSession):
    """Create a new user asynchronously."""
    # Check for existing user
    stmt = select(User).where(
        (User.email == user_data.email) | (User.username == user_data.username)
    )
    result = await db.execute(stmt)
    existing = result.scalar_one_or_none()

    if existing:
        raise HTTPException(
            status_code=status.HTTP_409_CONFLICT,
            detail="User already exists"
        )

    db_user = User(
        username=user_data.username,
        email=user_data.email,
        hashed_password=hashlib.sha256(user_data.password.encode()).hexdigest(),
        full_name=user_data.full_name,
    )

    db.add(db_user)
    await db.flush()       # Write to DB but don't commit yet
    await db.refresh(db_user)  # Get generated values
    return db_user


@router.get("/", response_model=list[UserResponse])
async def get_users(
    db: AsyncDbSession,
    skip: int = 0,
    limit: int = 100,
):
    """Get all users asynchronously."""
    stmt = (
        select(User)
        .where(User.is_active == True)
        .order_by(User.created_at.desc())
        .offset(skip)
        .limit(limit)
    )
    result = await db.execute(stmt)
    users = result.scalars().all()
    return users


@router.get("/{user_id}", response_model=UserResponse)
async def get_user(user_id: int, db: AsyncDbSession):
    """Get a single user by ID."""
    stmt = select(User).where(User.id == user_id)
    result = await db.execute(stmt)
    user = result.scalar_one_or_none()

    if not user:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f"User with id {user_id} not found"
        )
    return user


@router.get("/{user_id}/with-posts", response_model=UserWithPosts)
async def get_user_with_posts(user_id: int, db: AsyncDbSession):
    """Get a user with their posts (eager loading)."""
    stmt = (
        select(User)
        .options(selectinload(User.posts))  # Eager load posts
        .where(User.id == user_id)
    )
    result = await db.execute(stmt)
    user = result.scalar_one_or_none()

    if not user:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f"User with id {user_id} not found"
        )
    return user


@router.put("/{user_id}", response_model=UserResponse)
async def update_user(user_id: int, user_data: dict, db: AsyncDbSession):
    """Update a user asynchronously."""
    stmt = (
        update(User)
        .where(User.id == user_id)
        .values(**user_data)
        .returning(User)  # PostgreSQL only - returns updated row
    )
    result = await db.execute(stmt)
    user = result.scalar_one_or_none()

    if not user:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f"User with id {user_id} not found"
        )

    await db.refresh(user)
    return user


@router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
async def delete_user(user_id: int, db: AsyncDbSession):
    """Delete a user asynchronously."""
    stmt = select(User).where(User.id == user_id)
    result = await db.execute(stmt)
    user = result.scalar_one_or_none()

    if not user:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail=f"User with id {user_id} not found"
        )

    await db.delete(user)

8.4 When to Use Async

Scenario Sync or Async? Reason
High-traffic API Async Better concurrency under load
Simple CRUD app Either Sync is simpler, async adds complexity
Multiple DB calls per request Async Can run queries concurrently
CPU-heavy processing Sync Async does not help with CPU-bound work
External API calls + DB Async Can await both without blocking
Small team / prototype Sync Less complexity, faster development
Microservices at scale Async Better resource utilization
Important: Do not mix sync and async carelessly. If your route is async def, all database operations in that route should use async sessions. If your route is plain def, use sync sessions. FastAPI runs sync routes in a thread pool, so they do not block the event loop.

9. Connection Pooling

Connection pooling is critical for production applications. Creating a new database connection for every request is expensive — a pool maintains a set of reusable connections that are shared across requests.

9.1 How Connection Pooling Works

"""
Connection Pool Lifecycle:

1. Application starts → Pool creates `pool_size` connections
2. Request arrives → Pool provides an available connection
3. Request completes → Connection is returned to pool (not closed)
4. Pool is full → New requests wait up to `pool_timeout` seconds
5. Pool overflow → Up to `max_overflow` temporary connections created
6. Connection stale → `pool_pre_ping` detects and replaces it
7. Connection old → `pool_recycle` replaces connections after N seconds

Pool States:
┌─────────────────────────────────────────┐
│           Connection Pool                │
│                                          │
│  ┌──────┐ ┌──────┐ ┌──────┐ ┌──────┐   │
│  │ Conn │ │ Conn │ │ Conn │ │ Conn │   │  ← pool_size=5
│  │  #1  │ │  #2  │ │  #3  │ │  #4  │   │
│  │(idle)│ │(busy)│ │(idle)│ │(busy)│   │
│  └──────┘ └──────┘ └──────┘ └──────┘   │
│                                          │
│  ┌──────┐                                │
│  │ Conn │  ← max_overflow connections    │
│  │  #5  │    (temporary, closed when     │
│  │(busy)│     returned to pool)          │
│  └──────┘                                │
└─────────────────────────────────────────┘
"""

9.2 Pool Configuration

# app/database.py - Production-ready pool configuration
from sqlalchemy import create_engine
from sqlalchemy.pool import QueuePool, NullPool, StaticPool


# ─── Standard Configuration (most applications) ────────────────
engine = create_engine(
    "postgresql://user:pass@localhost:5432/mydb",

    # Pool implementation (QueuePool is default for non-SQLite)
    poolclass=QueuePool,

    # Number of persistent connections in the pool
    # Rule of thumb: start with 5-10, increase based on load testing
    pool_size=10,

    # Additional connections allowed when pool is exhausted
    # These connections are closed (not returned to pool) when done
    max_overflow=20,

    # Seconds to wait for a connection before raising TimeoutError
    pool_timeout=30,

    # Recycle connections after this many seconds
    # Prevents issues with databases that close idle connections
    # MySQL default wait_timeout is 28800 (8 hours)
    pool_recycle=1800,  # 30 minutes

    # Test connections before using them
    # Issues a SELECT 1 before handing out a connection
    pool_pre_ping=True,

    # Log pool events for debugging
    echo_pool=True,  # Set to False in production
)


# ─── Testing Configuration (SQLite in-memory) ──────────────────
test_engine = create_engine(
    "sqlite:///:memory:",
    poolclass=StaticPool,  # Single connection for all threads
    connect_args={"check_same_thread": False},
)


# ─── Serverless Configuration (no persistent pool) ─────────────
serverless_engine = create_engine(
    "postgresql://user:pass@host:5432/mydb",
    poolclass=NullPool,  # No pooling - new connection every time
    # Useful for AWS Lambda, Google Cloud Functions, etc.
)

9.3 Monitoring Pool Health

# app/routers/health.py
from fastapi import APIRouter
from app.database import engine

router = APIRouter(tags=["health"])


@router.get("/health/db")
def database_health():
    """Check database connection pool health."""
    pool = engine.pool

    return {
        "pool_size": pool.size(),
        "checked_in": pool.checkedin(),    # Available connections
        "checked_out": pool.checkedout(),  # In-use connections
        "overflow": pool.overflow(),       # Overflow connections in use
        "total_connections": pool.checkedin() + pool.checkedout(),
        "status": "healthy" if pool.checkedin() > 0 or pool.size() > pool.checkedout() else "stressed",
    }


# Event listeners for monitoring
from sqlalchemy import event


@event.listens_for(engine, "checkout")
def on_checkout(dbapi_conn, connection_rec, connection_proxy):
    """Called when a connection is checked out from the pool."""
    logger.debug(f"Connection checked out. Pool: {engine.pool.status()}")


@event.listens_for(engine, "checkin")
def on_checkin(dbapi_conn, connection_rec):
    """Called when a connection is returned to the pool."""
    logger.debug(f"Connection checked in. Pool: {engine.pool.status()}")


@event.listens_for(engine, "connect")
def on_connect(dbapi_conn, connection_rec):
    """Called when a new raw connection is created."""
    logger.info("New database connection created")


@event.listens_for(engine, "invalidate")
def on_invalidate(dbapi_conn, connection_rec, exception):
    """Called when a connection is invalidated."""
    logger.warning(f"Connection invalidated: {exception}")

9.4 Pool Sizing Guidelines

Application Type pool_size max_overflow Notes
Small API (dev) 5 10 Default settings are fine
Medium API 10-20 20-30 Monitor and adjust
High-traffic API 20-50 50-100 Use connection pooler (PgBouncer)
Serverless N/A N/A Use NullPool or external pooler
Background workers 2-5 5 Fewer connections needed
Production Tip: For PostgreSQL at scale, consider using PgBouncer as an external connection pooler. It sits between your application and PostgreSQL, managing connections more efficiently than application-level pooling alone. With PgBouncer, you can use NullPool in SQLAlchemy and let PgBouncer handle all pooling.

10. Relationships and Joins

Efficiently loading related data is one of the most important skills when working with an ORM. SQLAlchemy provides several strategies for loading relationships, each with different performance characteristics.

10.1 The N+1 Problem

The N+1 problem is the most common performance issue with ORMs. It occurs when you load a list of N records and then access a relationship on each one, causing N additional queries.

# THE N+1 PROBLEM - DO NOT DO THIS IN PRODUCTION

# Query 1: Get all users
users = db.query(User).all()  # SELECT * FROM users → returns 100 users

# N queries: Access posts for each user (lazy loading)
for user in users:
    print(f"{user.username} has {len(user.posts)} posts")
    # Each access to user.posts triggers:
    # SELECT * FROM posts WHERE author_id = 1
    # SELECT * FROM posts WHERE author_id = 2
    # SELECT * FROM posts WHERE author_id = 3
    # ... 100 more queries!

# Total: 1 + 100 = 101 queries for 100 users!

10.2 Eager Loading with joinedload

joinedload uses a SQL JOIN to load related data in a single query. Best for one-to-one and many-to-one relationships.

from sqlalchemy.orm import joinedload

# Single query with JOIN - loads users and posts together
users = (
    db.query(User)
    .options(joinedload(User.posts))
    .all()
)

# Generated SQL:
# SELECT users.*, posts.*
# FROM users
# LEFT OUTER JOIN posts ON users.id = posts.author_id

# Now accessing posts does NOT trigger additional queries
for user in users:
    print(f"{user.username} has {len(user.posts)} posts")  # No extra query!

# Total: 1 query!

10.3 Eager Loading with selectinload

selectinload uses a separate SELECT with an IN clause. Best for one-to-many and many-to-many relationships where JOIN would create duplicate rows.

from sqlalchemy.orm import selectinload

# Two queries: one for users, one for all their posts
users = (
    db.query(User)
    .options(selectinload(User.posts))
    .all()
)

# Generated SQL:
# Query 1: SELECT * FROM users
# Query 2: SELECT * FROM posts WHERE posts.author_id IN (1, 2, 3, ..., 100)

# Total: 2 queries regardless of how many users!

# Nested eager loading
users = (
    db.query(User)
    .options(
        selectinload(User.posts).selectinload(Post.tags),
        selectinload(User.orders).selectinload(Order.items),
    )
    .all()
)
# Loads: users → posts → tags, users → orders → items
# Total: 5 queries (one per table)

10.4 Comparison of Loading Strategies

Strategy SQL Queries Best For Trade-offs
lazy="select" (default) N+1 Rarely accessed relationships Simple but can be very slow
joinedload 1 (JOIN) Many-to-one, one-to-one Can create large result sets with duplicates
selectinload 2 (SELECT + IN) One-to-many, many-to-many Best general-purpose eager loading
subqueryload 2 (SELECT + subquery) Large collections Similar to selectinload, uses subquery
raiseload 0 (raises error) Preventing accidental lazy loads Forces explicit loading strategy

10.5 Practical Join Examples

# app/routers/products.py
from sqlalchemy.orm import joinedload, selectinload
from sqlalchemy import select, func, and_, or_


@router.get("/products/with-categories", response_model=list[ProductWithCategory])
def get_products_with_categories(db: DbSession):
    """Get products with their category (many-to-one → joinedload)."""
    products = (
        db.query(Product)
        .options(joinedload(Product.category))
        .filter(Product.is_active == True)
        .all()
    )
    return products


@router.get("/categories/{category_id}/products")
def get_category_with_products(category_id: int, db: DbSession):
    """Get a category with all its products (one-to-many → selectinload)."""
    category = (
        db.query(Category)
        .options(selectinload(Category.products))
        .filter(Category.id == category_id)
        .first()
    )
    if not category:
        raise HTTPException(status_code=404, detail="Category not found")

    return {
        "category": category.name,
        "product_count": len(category.products),
        "products": category.products,
    }


@router.get("/orders/{order_id}/details")
def get_order_details(order_id: int, db: DbSession):
    """Get order with items and product details (nested eager loading)."""
    order = (
        db.query(Order)
        .options(
            joinedload(Order.user),  # many-to-one: joinedload
            selectinload(Order.items).joinedload(OrderItem.product),  # nested
        )
        .filter(Order.id == order_id)
        .first()
    )
    if not order:
        raise HTTPException(status_code=404, detail="Order not found")

    return order


# Explicit JOIN queries (for complex filters and aggregations)
@router.get("/products/by-category-stats")
def get_products_by_category_stats(db: DbSession):
    """Get product count and average price per category."""
    results = (
        db.query(
            Category.name,
            func.count(Product.id).label("product_count"),
            func.avg(Product.price).label("avg_price"),
            func.min(Product.price).label("min_price"),
            func.max(Product.price).label("max_price"),
        )
        .join(Product, Category.id == Product.category_id)
        .filter(Product.is_active == True)
        .group_by(Category.name)
        .order_by(func.count(Product.id).desc())
        .all()
    )

    return [
        {
            "category": r.name,
            "product_count": r.product_count,
            "avg_price": float(r.avg_price or 0),
            "min_price": float(r.min_price or 0),
            "max_price": float(r.max_price or 0),
        }
        for r in results
    ]


@router.get("/users/top-spenders")
def get_top_spenders(db: DbSession, limit: int = 10):
    """Get users who have spent the most (multi-table join with aggregation)."""
    results = (
        db.query(
            User.username,
            User.email,
            func.count(Order.id).label("order_count"),
            func.sum(Order.total_amount).label("total_spent"),
        )
        .join(Order, User.id == Order.user_id)
        .filter(Order.status != "cancelled")
        .group_by(User.id, User.username, User.email)
        .order_by(func.sum(Order.total_amount).desc())
        .limit(limit)
        .all()
    )

    return [
        {
            "username": r.username,
            "email": r.email,
            "order_count": r.order_count,
            "total_spent": float(r.total_spent or 0),
        }
        for r in results
    ]

10.6 Preventing N+1 with raiseload

from sqlalchemy.orm import raiseload

# Strict mode: raise an error if any relationship is lazy-loaded
users = (
    db.query(User)
    .options(
        selectinload(User.posts),    # This will be loaded
        raiseload(User.orders),      # This will raise if accessed
        raiseload("*"),              # ALL other relationships will raise
    )
    .all()
)

# Accessing user.posts works fine (it was eagerly loaded)
for user in users:
    print(user.posts)  # OK

    # Accessing user.orders raises an error instead of silently
    # executing a query. This helps catch N+1 issues during development.
    # print(user.orders)  # Raises: sqlalchemy.exc.InvalidRequestError

11. Transactions

Transactions ensure that a group of database operations either all succeed or all fail. Proper transaction management is essential for data integrity, especially when multiple related changes need to be atomic.

11.1 Basic Transaction Management

# SQLAlchemy sessions have implicit transaction management

@router.post("/orders/", response_model=OrderResponse)
def create_order(order_data: OrderCreate, db: DbSession):
    """
    Create an order with items - all or nothing.

    If any step fails, the entire operation is rolled back.
    """
    # Step 1: Create the order
    order = Order(
        order_number=generate_order_number(),
        user_id=order_data.user_id,
        status=OrderStatus.PENDING,
    )
    db.add(order)
    db.flush()  # Get the order.id without committing

    # Step 2: Create order items and calculate total
    total = 0
    for item_data in order_data.items:
        # Verify product exists and has stock
        product = db.query(Product).filter(Product.id == item_data.product_id).first()
        if not product:
            raise HTTPException(status_code=404, detail=f"Product {item_data.product_id} not found")
        if product.stock_quantity < item_data.quantity:
            raise HTTPException(
                status_code=400,
                detail=f"Insufficient stock for {product.name}"
            )

        # Create order item
        item_total = product.price * item_data.quantity
        order_item = OrderItem(
            order_id=order.id,
            product_id=product.id,
            quantity=item_data.quantity,
            unit_price=product.price,
            total_price=item_total,
        )
        db.add(order_item)
        total += item_total

        # Step 3: Reduce stock
        product.stock_quantity -= item_data.quantity

    # Step 4: Update order total
    order.total_amount = total

    # Step 5: Commit everything at once
    # If ANY step above fails, nothing is committed
    db.commit()
    db.refresh(order)
    return order

11.2 Explicit Transaction Control

from sqlalchemy.orm import Session


def transfer_funds(
    db: Session,
    from_account_id: int,
    to_account_id: int,
    amount: float,
) -> dict:
    """
    Transfer funds between accounts with explicit transaction control.
    """
    try:
        # Begin a transaction (implicit with session)
        from_account = db.query(Account).filter(Account.id == from_account_id).with_for_update().first()
        to_account = db.query(Account).filter(Account.id == to_account_id).with_for_update().first()

        if not from_account or not to_account:
            raise ValueError("Account not found")

        if from_account.balance < amount:
            raise ValueError("Insufficient funds")

        # Perform the transfer
        from_account.balance -= amount
        to_account.balance += amount

        # Record the transaction
        transaction = Transaction(
            from_account_id=from_account_id,
            to_account_id=to_account_id,
            amount=amount,
            status="completed",
        )
        db.add(transaction)

        db.commit()
        return {"status": "success", "transaction_id": transaction.id}

    except Exception as e:
        db.rollback()
        # Log the error
        logger.error(f"Transfer failed: {e}")
        raise

11.3 Nested Transactions (Savepoints)

@router.post("/orders/with-notifications")
def create_order_with_notification(order_data: OrderCreate, db: DbSession):
    """
    Create an order and try to send a notification.
    The order should be saved even if the notification fails.
    """
    # Create the order (main transaction)
    order = Order(
        order_number=generate_order_number(),
        user_id=order_data.user_id,
        status=OrderStatus.PENDING,
        total_amount=order_data.total,
    )
    db.add(order)
    db.flush()

    # Try to create a notification (nested transaction / savepoint)
    try:
        nested = db.begin_nested()  # Creates a SAVEPOINT

        notification = Notification(
            user_id=order_data.user_id,
            message=f"Order {order.order_number} confirmed!",
            order_id=order.id,
        )
        db.add(notification)
        nested.commit()  # Release the savepoint

    except Exception as e:
        # Notification failed, but order is still intact
        # The savepoint rollback only affects the notification
        logger.warning(f"Failed to create notification: {e}")
        # nested is already rolled back, main transaction continues

    # Commit the main transaction (order is saved regardless)
    db.commit()
    db.refresh(order)
    return order

11.4 Transaction Patterns and Best Practices

# Pattern 1: Service layer with explicit transactions
class OrderService:
    def __init__(self, db: Session):
        self.db = db

    def create_order(self, order_data: OrderCreate) -> Order:
        """Business logic with transaction management."""
        try:
            order = self._build_order(order_data)
            self._validate_stock(order_data.items)
            self._create_order_items(order, order_data.items)
            self._update_stock(order_data.items)
            self._calculate_total(order)

            self.db.commit()
            self.db.refresh(order)
            return order

        except Exception:
            self.db.rollback()
            raise

    def _build_order(self, data: OrderCreate) -> Order:
        order = Order(order_number=generate_order_number(), user_id=data.user_id)
        self.db.add(order)
        self.db.flush()
        return order


# Pattern 2: Context manager for transactions
from contextlib import contextmanager

@contextmanager
def transaction(db: Session):
    """Context manager for explicit transaction boundaries."""
    try:
        yield db
        db.commit()
    except Exception:
        db.rollback()
        raise


# Usage:
def process_payment(db: Session, order_id: int, payment_data: dict):
    with transaction(db):
        order = db.query(Order).get(order_id)
        order.status = OrderStatus.CONFIRMED
        payment = Payment(order_id=order_id, **payment_data)
        db.add(payment)
        # Commits automatically on exit, rolls back on exception
Transaction Tips:

  • Keep transactions short — long-running transactions hold locks and reduce concurrency
  • Use with_for_update() for rows that need pessimistic locking (e.g., account balances)
  • Use savepoints (begin_nested()) when partial failures are acceptable
  • Always handle IntegrityError for unique constraint violations
  • Use db.flush() to get generated IDs without committing the transaction

12. Repository Pattern

The Repository pattern abstracts data access logic behind a clean interface. Instead of scattering SQLAlchemy queries throughout your route handlers, you centralize them in repository classes. This makes your code more testable, reusable, and easier to maintain.

12.1 Why Use the Repository Pattern?

Without Repository With Repository
Queries scattered in route handlers Queries centralized in one place
Hard to test (need full FastAPI context) Easy to test (mock the repository)
Duplicate query logic across routes Reusable query methods
Routes know about SQLAlchemy internals Routes only know the repository interface
Switching ORMs requires changing every route Switching ORMs only changes repositories

12.2 Generic Base Repository

# app/repositories/base.py
from typing import Generic, TypeVar, Type, Optional, List, Any
from sqlalchemy.orm import Session
from sqlalchemy import select, func
from app.database import Base

# TypeVar for the model type
ModelType = TypeVar("ModelType", bound=Base)


class BaseRepository(Generic[ModelType]):
    """
    Generic repository with common CRUD operations.

    Subclass this for model-specific repositories.
    """

    def __init__(self, model: Type[ModelType], db: Session):
        self.model = model
        self.db = db

    def get(self, id: int) -> Optional[ModelType]:
        """Get a single record by ID."""
        return self.db.query(self.model).filter(self.model.id == id).first()

    def get_or_404(self, id: int) -> ModelType:
        """Get a single record by ID or raise HTTPException."""
        from fastapi import HTTPException, status
        obj = self.get(id)
        if not obj:
            raise HTTPException(
                status_code=status.HTTP_404_NOT_FOUND,
                detail=f"{self.model.__name__} with id {id} not found",
            )
        return obj

    def get_all(
        self,
        skip: int = 0,
        limit: int = 100,
        **filters: Any,
    ) -> List[ModelType]:
        """Get all records with optional filtering and pagination."""
        query = self.db.query(self.model)

        # Apply dynamic filters
        for field, value in filters.items():
            if value is not None and hasattr(self.model, field):
                query = query.filter(getattr(self.model, field) == value)

        return query.offset(skip).limit(limit).all()

    def count(self, **filters: Any) -> int:
        """Count records with optional filtering."""
        query = self.db.query(func.count(self.model.id))
        for field, value in filters.items():
            if value is not None and hasattr(self.model, field):
                query = query.filter(getattr(self.model, field) == value)
        return query.scalar()

    def create(self, obj_data: dict) -> ModelType:
        """Create a new record."""
        db_obj = self.model(**obj_data)
        self.db.add(db_obj)
        self.db.flush()
        self.db.refresh(db_obj)
        return db_obj

    def update(self, id: int, obj_data: dict) -> Optional[ModelType]:
        """Update an existing record."""
        db_obj = self.get(id)
        if not db_obj:
            return None

        for field, value in obj_data.items():
            if value is not None:
                setattr(db_obj, field, value)

        self.db.flush()
        self.db.refresh(db_obj)
        return db_obj

    def delete(self, id: int) -> bool:
        """Delete a record by ID. Returns True if deleted."""
        db_obj = self.get(id)
        if not db_obj:
            return False

        self.db.delete(db_obj)
        self.db.flush()
        return True

    def exists(self, **filters: Any) -> bool:
        """Check if a record exists with given filters."""
        query = self.db.query(self.model.id)
        for field, value in filters.items():
            if hasattr(self.model, field):
                query = query.filter(getattr(self.model, field) == value)
        return query.first() is not None

12.3 Model-Specific Repository

# app/repositories/user.py
from typing import Optional, List
from sqlalchemy.orm import Session, selectinload
from sqlalchemy import or_
from app.repositories.base import BaseRepository
from app.models.user import User
import hashlib


class UserRepository(BaseRepository[User]):
    """Repository for User-specific database operations."""

    def __init__(self, db: Session):
        super().__init__(User, db)

    def get_by_email(self, email: str) -> Optional[User]:
        """Find a user by email address."""
        return self.db.query(User).filter(User.email == email).first()

    def get_by_username(self, username: str) -> Optional[User]:
        """Find a user by username."""
        return self.db.query(User).filter(User.username == username).first()

    def search(self, query: str, skip: int = 0, limit: int = 20) -> List[User]:
        """Search users by username, email, or full name."""
        pattern = f"%{query}%"
        return (
            self.db.query(User)
            .filter(
                or_(
                    User.username.ilike(pattern),
                    User.email.ilike(pattern),
                    User.full_name.ilike(pattern),
                )
            )
            .offset(skip)
            .limit(limit)
            .all()
        )

    def get_active_users(self, skip: int = 0, limit: int = 100) -> List[User]:
        """Get only active users."""
        return (
            self.db.query(User)
            .filter(User.is_active == True)
            .order_by(User.created_at.desc())
            .offset(skip)
            .limit(limit)
            .all()
        )

    def get_with_posts(self, user_id: int) -> Optional[User]:
        """Get a user with their posts eagerly loaded."""
        return (
            self.db.query(User)
            .options(selectinload(User.posts))
            .filter(User.id == user_id)
            .first()
        )

    def create_user(self, username: str, email: str, password: str, **kwargs) -> User:
        """Create a new user with password hashing."""
        user_data = {
            "username": username,
            "email": email,
            "hashed_password": hashlib.sha256(password.encode()).hexdigest(),
            **kwargs,
        }
        return self.create(user_data)

    def deactivate(self, user_id: int) -> Optional[User]:
        """Soft-delete a user by deactivating their account."""
        return self.update(user_id, {"is_active": False})

    def email_exists(self, email: str) -> bool:
        """Check if an email is already registered."""
        return self.exists(email=email)

12.4 Product Repository

# app/repositories/product.py
from typing import Optional, List
from decimal import Decimal
from sqlalchemy.orm import Session, joinedload
from sqlalchemy import and_
from app.repositories.base import BaseRepository
from app.models.product import Product


class ProductRepository(BaseRepository[Product]):
    """Repository for Product-specific database operations."""

    def __init__(self, db: Session):
        super().__init__(Product, db)

    def get_by_sku(self, sku: str) -> Optional[Product]:
        """Find a product by SKU."""
        return self.db.query(Product).filter(Product.sku == sku).first()

    def get_by_slug(self, slug: str) -> Optional[Product]:
        """Find a product by URL slug."""
        return self.db.query(Product).filter(Product.slug == slug).first()

    def get_by_category(
        self, category_id: int, skip: int = 0, limit: int = 50
    ) -> List[Product]:
        """Get all products in a category."""
        return (
            self.db.query(Product)
            .filter(
                and_(
                    Product.category_id == category_id,
                    Product.is_active == True,
                )
            )
            .order_by(Product.name)
            .offset(skip)
            .limit(limit)
            .all()
        )

    def get_in_price_range(
        self, min_price: Decimal, max_price: Decimal
    ) -> List[Product]:
        """Get products within a price range."""
        return (
            self.db.query(Product)
            .filter(
                and_(
                    Product.price >= min_price,
                    Product.price <= max_price,
                    Product.is_active == True,
                )
            )
            .order_by(Product.price)
            .all()
        )

    def get_low_stock(self, threshold: int = 10) -> List[Product]:
        """Get products with stock below threshold."""
        return (
            self.db.query(Product)
            .filter(
                and_(
                    Product.stock_quantity <= threshold,
                    Product.is_active == True,
                )
            )
            .order_by(Product.stock_quantity)
            .all()
        )

    def update_stock(self, product_id: int, quantity_change: int) -> Optional[Product]:
        """Update product stock quantity (positive to add, negative to subtract)."""
        product = self.get(product_id)
        if not product:
            return None

        new_quantity = product.stock_quantity + quantity_change
        if new_quantity < 0:
            raise ValueError(f"Insufficient stock. Current: {product.stock_quantity}")

        product.stock_quantity = new_quantity
        self.db.flush()
        self.db.refresh(product)
        return product

    def get_with_category(self, product_id: int) -> Optional[Product]:
        """Get a product with its category eagerly loaded."""
        return (
            self.db.query(Product)
            .options(joinedload(Product.category))
            .filter(Product.id == product_id)
            .first()
        )

12.5 Dependency Injection with Repositories

# app/dependencies.py
from typing import Annotated
from fastapi import Depends
from sqlalchemy.orm import Session
from app.database import SessionLocal
from app.repositories.user import UserRepository
from app.repositories.product import ProductRepository


def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()


DbSession = Annotated[Session, Depends(get_db)]


# Repository dependencies
def get_user_repo(db: DbSession) -> UserRepository:
    return UserRepository(db)

def get_product_repo(db: DbSession) -> ProductRepository:
    return ProductRepository(db)


# Type aliases for clean route signatures
UserRepo = Annotated[UserRepository, Depends(get_user_repo)]
ProductRepo = Annotated[ProductRepository, Depends(get_product_repo)]

12.6 Using Repositories in Routes

# app/routers/users.py
from fastapi import APIRouter, HTTPException, status
from app.dependencies import UserRepo, DbSession
from app.schemas.user import UserCreate, UserResponse, UserUpdate

router = APIRouter(prefix="/users", tags=["users"])


@router.post("/", response_model=UserResponse, status_code=status.HTTP_201_CREATED)
def create_user(user_data: UserCreate, repo: UserRepo, db: DbSession):
    """Create a new user using the repository."""
    # Check for duplicates
    if repo.email_exists(user_data.email):
        raise HTTPException(
            status_code=status.HTTP_409_CONFLICT,
            detail="Email already registered"
        )

    user = repo.create_user(
        username=user_data.username,
        email=user_data.email,
        password=user_data.password,
        full_name=user_data.full_name,
    )
    db.commit()
    return user


@router.get("/", response_model=list[UserResponse])
def get_users(repo: UserRepo, skip: int = 0, limit: int = 100):
    """Get all active users."""
    return repo.get_active_users(skip=skip, limit=limit)


@router.get("/search", response_model=list[UserResponse])
def search_users(q: str, repo: UserRepo):
    """Search users by name, email, or username."""
    return repo.search(q)


@router.get("/{user_id}", response_model=UserResponse)
def get_user(user_id: int, repo: UserRepo):
    """Get a user by ID (raises 404 if not found)."""
    return repo.get_or_404(user_id)


@router.patch("/{user_id}", response_model=UserResponse)
def update_user(user_id: int, user_data: UserUpdate, repo: UserRepo, db: DbSession):
    """Update a user."""
    user = repo.update(user_id, user_data.model_dump(exclude_unset=True))
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    db.commit()
    return user


@router.delete("/{user_id}", status_code=status.HTTP_204_NO_CONTENT)
def delete_user(user_id: int, repo: UserRepo, db: DbSession):
    """Soft-delete a user."""
    user = repo.deactivate(user_id)
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    db.commit()

13. Complete Project: E-Commerce Data Layer

Now let us bring everything together into a complete, production-ready e-commerce data layer. This project includes Products, Categories, Users, Orders, and full CRUD operations with relationships and proper architecture.

13.1 Complete Project Structure

ecommerce/
├── app/
│   ├── __init__.py
│   ├── main.py
│   ├── config.py
│   ├── database.py
│   ├── models/
│   │   ├── __init__.py          # Import all models
│   │   ├── user.py
│   │   ├── product.py
│   │   ├── category.py
│   │   └── order.py
│   ├── schemas/
│   │   ├── __init__.py
│   │   ├── user.py
│   │   ├── product.py
│   │   ├── category.py
│   │   └── order.py
│   ├── repositories/
│   │   ├── __init__.py
│   │   ├── base.py
│   │   ├── user.py
│   │   ├── product.py
│   │   └── order.py
│   ├── routers/
│   │   ├── __init__.py
│   │   ├── users.py
│   │   ├── products.py
│   │   ├── categories.py
│   │   └── orders.py
│   └── dependencies.py
├── alembic/
│   ├── versions/
│   └── env.py
├── alembic.ini
├── requirements.txt
└── .env

13.2 Application Entry Point

# app/main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
from app.database import create_tables, engine
from app.routers import users, products, categories, orders


@asynccontextmanager
async def lifespan(app: FastAPI):
    """Application startup and shutdown events."""
    # Startup: create tables (use Alembic in production)
    create_tables()
    print("Database tables created successfully")
    yield
    # Shutdown: dispose engine connections
    engine.dispose()
    print("Database connections closed")


app = FastAPI(
    title="E-Commerce API",
    description="Complete e-commerce data layer with FastAPI and SQLAlchemy",
    version="1.0.0",
    lifespan=lifespan,
)

# Include routers
app.include_router(users.router)
app.include_router(products.router)
app.include_router(categories.router)
app.include_router(orders.router)


@app.get("/")
def root():
    return {
        "message": "E-Commerce API",
        "docs": "/docs",
        "version": "1.0.0",
    }


@app.get("/health")
def health_check():
    """Health check endpoint."""
    from app.database import engine

    pool = engine.pool
    return {
        "status": "healthy",
        "database": {
            "pool_size": pool.size(),
            "connections_in_use": pool.checkedout(),
            "connections_available": pool.checkedin(),
        },
    }

13.3 Complete Models Package

# app/models/__init__.py
"""
Import all models here so Alembic can detect them for migrations.
"""
from app.models.user import User
from app.models.category import Category
from app.models.product import Product
from app.models.order import Order, OrderItem

__all__ = ["User", "Category", "Product", "Order", "OrderItem"]
# app/models/category.py (complete)
from datetime import datetime
from typing import Optional, List
from sqlalchemy import String, Integer, Text, DateTime, ForeignKey, func
from sqlalchemy.orm import Mapped, mapped_column, relationship
from app.database import Base


class Category(Base):
    __tablename__ = "categories"

    id: Mapped[int] = mapped_column(Integer, primary_key=True, autoincrement=True)
    name: Mapped[str] = mapped_column(String(100), unique=True, nullable=False, index=True)
    slug: Mapped[str] = mapped_column(String(100), unique=True, nullable=False)
    description: Mapped[Optional[str]] = mapped_column(Text, nullable=True)
    parent_id: Mapped[Optional[int]] = mapped_column(
        Integer, ForeignKey("categories.id", ondelete="SET NULL"), nullable=True
    )
    created_at: Mapped[datetime] = mapped_column(
        DateTime(timezone=True), server_default=func.now()
    )

    # Self-referential relationships
    parent: Mapped[Optional["Category"]] = relationship(
        "Category", remote_side="Category.id", back_populates="children"
    )
    children: Mapped[List["Category"]] = relationship(
        "Category", back_populates="parent"
    )
    products: Mapped[List["Product"]] = relationship(
        "Product", back_populates="category"
    )

    def __repr__(self) -> str:
        return f"<Category(id={self.id}, name='{self.name}')>"

13.4 Complete Order Schemas

# app/schemas/order.py
from datetime import datetime
from typing import List, Optional
from decimal import Decimal
from pydantic import BaseModel, Field, ConfigDict


class OrderItemCreate(BaseModel):
    """Schema for adding an item to an order."""
    product_id: int
    quantity: int = Field(..., gt=0)


class OrderItemResponse(BaseModel):
    """Schema for order item in responses."""
    id: int
    product_id: int
    quantity: int
    unit_price: Decimal
    total_price: Decimal

    model_config = ConfigDict(from_attributes=True)


class OrderCreate(BaseModel):
    """Schema for creating a new order."""
    user_id: int
    items: List[OrderItemCreate] = Field(..., min_length=1)
    shipping_address: Optional[str] = None


class OrderResponse(BaseModel):
    """Schema for order API responses."""
    id: int
    order_number: str
    status: str
    total_amount: Decimal
    user_id: int
    created_at: datetime
    items: List[OrderItemResponse] = []

    model_config = ConfigDict(from_attributes=True)


class OrderSummary(BaseModel):
    """Lightweight order for list endpoints."""
    id: int
    order_number: str
    status: str
    total_amount: Decimal
    created_at: datetime

    model_config = ConfigDict(from_attributes=True)

13.5 Order Repository

# app/repositories/order.py
from typing import Optional, List
from sqlalchemy.orm import Session, selectinload, joinedload
from sqlalchemy import func
from app.repositories.base import BaseRepository
from app.models.order import Order, OrderItem, OrderStatus
from app.models.product import Product
import uuid
from datetime import datetime


class OrderRepository(BaseRepository[Order]):
    """Repository for order operations."""

    def __init__(self, db: Session):
        super().__init__(Order, db)

    def generate_order_number(self) -> str:
        """Generate a unique order number."""
        timestamp = datetime.utcnow().strftime("%Y%m%d%H%M%S")
        unique_id = uuid.uuid4().hex[:6].upper()
        return f"ORD-{timestamp}-{unique_id}"

    def get_with_items(self, order_id: int) -> Optional[Order]:
        """Get an order with its items and product details."""
        return (
            self.db.query(Order)
            .options(
                selectinload(Order.items).joinedload(OrderItem.product),
                joinedload(Order.user),
            )
            .filter(Order.id == order_id)
            .first()
        )

    def get_user_orders(
        self, user_id: int, skip: int = 0, limit: int = 20
    ) -> List[Order]:
        """Get all orders for a specific user."""
        return (
            self.db.query(Order)
            .filter(Order.user_id == user_id)
            .order_by(Order.created_at.desc())
            .offset(skip)
            .limit(limit)
            .all()
        )

    def create_order(self, user_id: int, items: list) -> Order:
        """
        Create a complete order with items.

        Validates stock, creates order items, updates stock quantities,
        and calculates the total — all in a single transaction.
        """
        # Create the order
        order = Order(
            order_number=self.generate_order_number(),
            user_id=user_id,
            status=OrderStatus.PENDING,
        )
        self.db.add(order)
        self.db.flush()  # Get order.id

        total_amount = 0

        for item_data in items:
            # Fetch and validate product
            product = (
                self.db.query(Product)
                .filter(Product.id == item_data.product_id)
                .with_for_update()  # Lock row to prevent race conditions
                .first()
            )

            if not product:
                raise ValueError(f"Product {item_data.product_id} not found")
            if not product.is_active:
                raise ValueError(f"Product '{product.name}' is not available")
            if product.stock_quantity < item_data.quantity:
                raise ValueError(
                    f"Insufficient stock for '{product.name}'. "
                    f"Available: {product.stock_quantity}, Requested: {item_data.quantity}"
                )

            # Calculate price
            item_total = float(product.price) * item_data.quantity

            # Create order item
            order_item = OrderItem(
                order_id=order.id,
                product_id=product.id,
                quantity=item_data.quantity,
                unit_price=product.price,
                total_price=item_total,
            )
            self.db.add(order_item)

            # Update stock
            product.stock_quantity -= item_data.quantity
            total_amount += item_total

        # Set order total
        order.total_amount = total_amount
        self.db.flush()
        self.db.refresh(order)

        return order

    def update_status(self, order_id: int, new_status: OrderStatus) -> Optional[Order]:
        """Update order status with validation."""
        order = self.get(order_id)
        if not order:
            return None

        # Validate status transitions
        valid_transitions = {
            OrderStatus.PENDING: [OrderStatus.CONFIRMED, OrderStatus.CANCELLED],
            OrderStatus.CONFIRMED: [OrderStatus.SHIPPED, OrderStatus.CANCELLED],
            OrderStatus.SHIPPED: [OrderStatus.DELIVERED],
            OrderStatus.DELIVERED: [],
            OrderStatus.CANCELLED: [],
        }

        if new_status not in valid_transitions.get(order.status, []):
            raise ValueError(
                f"Cannot transition from {order.status} to {new_status}"
            )

        order.status = new_status

        # If cancelled, restore stock
        if new_status == OrderStatus.CANCELLED:
            self._restore_stock(order)

        self.db.flush()
        self.db.refresh(order)
        return order

    def _restore_stock(self, order: Order):
        """Restore stock quantities when an order is cancelled."""
        items = (
            self.db.query(OrderItem)
            .filter(OrderItem.order_id == order.id)
            .all()
        )
        for item in items:
            product = self.db.query(Product).get(item.product_id)
            if product:
                product.stock_quantity += item.quantity

    def get_order_stats(self, user_id: Optional[int] = None) -> dict:
        """Get order statistics."""
        query = self.db.query(Order)
        if user_id:
            query = query.filter(Order.user_id == user_id)

        total_orders = query.count()
        total_revenue = (
            query.filter(Order.status != OrderStatus.CANCELLED)
            .with_entities(func.sum(Order.total_amount))
            .scalar()
        ) or 0

        status_counts = (
            query.with_entities(Order.status, func.count(Order.id))
            .group_by(Order.status)
            .all()
        )

        return {
            "total_orders": total_orders,
            "total_revenue": float(total_revenue),
            "orders_by_status": {s.value: c for s, c in status_counts},
        }

13.6 Order Routes

# app/routers/orders.py
from fastapi import APIRouter, HTTPException, status, Depends
from typing import Annotated
from app.dependencies import DbSession
from app.repositories.order import OrderRepository
from app.schemas.order import OrderCreate, OrderResponse, OrderSummary
from app.models.order import OrderStatus

router = APIRouter(prefix="/orders", tags=["orders"])


def get_order_repo(db: DbSession) -> OrderRepository:
    return OrderRepository(db)

OrderRepo = Annotated[OrderRepository, Depends(get_order_repo)]


@router.post("/", response_model=OrderResponse, status_code=status.HTTP_201_CREATED)
def create_order(order_data: OrderCreate, repo: OrderRepo, db: DbSession):
    """Create a new order with items."""
    try:
        order = repo.create_order(
            user_id=order_data.user_id,
            items=order_data.items,
        )
        db.commit()

        # Reload with relationships for response
        order = repo.get_with_items(order.id)
        return order

    except ValueError as e:
        db.rollback()
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail=str(e),
        )


@router.get("/{order_id}", response_model=OrderResponse)
def get_order(order_id: int, repo: OrderRepo):
    """Get order details with items."""
    order = repo.get_with_items(order_id)
    if not order:
        raise HTTPException(status_code=404, detail="Order not found")
    return order


@router.get("/user/{user_id}", response_model=list[OrderSummary])
def get_user_orders(user_id: int, repo: OrderRepo, skip: int = 0, limit: int = 20):
    """Get all orders for a user."""
    return repo.get_user_orders(user_id, skip=skip, limit=limit)


@router.patch("/{order_id}/status")
def update_order_status(
    order_id: int,
    new_status: OrderStatus,
    repo: OrderRepo,
    db: DbSession,
):
    """Update an order's status."""
    try:
        order = repo.update_status(order_id, new_status)
        if not order:
            raise HTTPException(status_code=404, detail="Order not found")
        db.commit()
        return {"order_id": order.id, "new_status": order.status}
    except ValueError as e:
        db.rollback()
        raise HTTPException(status_code=400, detail=str(e))


@router.get("/stats/summary")
def get_order_stats(repo: OrderRepo):
    """Get order statistics."""
    return repo.get_order_stats()

13.7 Running the Application

# Create requirements.txt
cat > requirements.txt << EOF
fastapi==0.109.0
uvicorn==0.27.0
sqlalchemy==2.0.25
alembic==1.13.1
pydantic==2.5.3
pydantic-settings==2.1.0
python-dotenv==1.0.0
email-validator==2.1.0
EOF

# Install dependencies
pip install -r requirements.txt

# Create .env file
cat > .env << EOF
DATABASE_URL=sqlite:///./ecommerce.db
DATABASE_ECHO=true
DEBUG=true
EOF

# Initialize Alembic
alembic init alembic

# Generate initial migration
alembic revision --autogenerate -m "initial schema"

# Apply migration
alembic upgrade head

# Run the application
uvicorn app.main:app --reload --host 0.0.0.0 --port 8000

13.8 Testing the API

# Create a category
curl -X POST http://localhost:8000/categories/ \
  -H "Content-Type: application/json" \
  -d '{"name": "Electronics", "slug": "electronics", "description": "Electronic devices"}'

# Create a product
curl -X POST http://localhost:8000/products/ \
  -H "Content-Type: application/json" \
  -d '{
    "name": "Wireless Headphones",
    "slug": "wireless-headphones",
    "price": 79.99,
    "sku": "WH-001",
    "stock_quantity": 50,
    "category_id": 1
  }'

# Create a user
curl -X POST http://localhost:8000/users/ \
  -H "Content-Type: application/json" \
  -d '{
    "username": "johndoe",
    "email": "john@example.com",
    "password": "SecurePass123",
    "full_name": "John Doe"
  }'

# Create an order
curl -X POST http://localhost:8000/orders/ \
  -H "Content-Type: application/json" \
  -d '{
    "user_id": 1,
    "items": [
      {"product_id": 1, "quantity": 2}
    ]
  }'

# Get order details
curl http://localhost:8000/orders/1

# Update order status
curl -X PATCH "http://localhost:8000/orders/1/status?new_status=confirmed"

# Get order statistics
curl http://localhost:8000/orders/stats/summary

# Check database health
curl http://localhost:8000/health

14. Key Takeaways

In this comprehensive tutorial, we covered every essential aspect of database integration in FastAPI. Here is a summary of the key concepts and recommendations:

Topic Key Points
Database Choice Use SQLite for development, PostgreSQL for production. SQLAlchemy makes switching easy.
SQLAlchemy Setup Use DeclarativeBase, configure the engine with pooling, and create a session factory.
Models Use Mapped type annotations (SQLAlchemy 2.0). Define proper indexes and constraints.
Session Management Use FastAPI’s dependency injection with yield for automatic cleanup.
CRUD Operations Use db.flush() for intermediate IDs, db.commit() at transaction boundaries.
Schemas vs Models Separate Pydantic schemas from SQLAlchemy models. Use from_attributes=True.
Alembic Migrations Always use Alembic in production. Never use create_all() in production.
Async Database Use async for high-traffic APIs. Stick with sync for simpler applications.
Connection Pooling Configure pool_size, max_overflow, and pool_pre_ping for production.
Relationships Use selectinload for collections, joinedload for single objects. Avoid N+1.
Transactions Keep transactions short. Use savepoints for partial failure tolerance.
Repository Pattern Abstract queries into repositories. Inject them via FastAPI dependencies.
What’s Next? In the next tutorial, we will dive into FastAPI Pydantic Models & Validation, where we will explore advanced validation techniques, custom validators, nested models, and how to build robust data validation layers for your API.



Subscribe To Our Newsletter
You will receive our latest post and tutorial.
Thank you for subscribing!

required
required


Leave a Reply

Your email address will not be published. Required fields are marked *