FastAPI – Authentication & Authorization

Security is one of the most critical aspects of any web application. A single vulnerability can expose user data, compromise accounts, and destroy user trust. FastAPI provides excellent built-in support for implementing authentication and authorization, leveraging Python’s type system and dependency injection to create secure, maintainable auth systems.

In this comprehensive tutorial, you will learn how to build a production-ready authentication and authorization system in FastAPI. We will cover everything from password hashing and JWT tokens to role-based access control, OAuth2 scopes, refresh token rotation, and security best practices. By the end, you will have a complete, reusable auth system that you can drop into any FastAPI project.

Prerequisites: You should be familiar with FastAPI basics (routes, models, dependencies), Pydantic, and have a basic understanding of HTTP and REST APIs. Familiarity with SQL databases (SQLAlchemy) is helpful but not required.

1. Authentication & Authorization Fundamentals

Before writing any code, it is essential to understand the distinction between authentication and authorization, and the common strategies used to implement them.

1.1 Authentication vs Authorization

Authentication answers the question: “Who are you?” It is the process of verifying a user’s identity. When a user logs in with a username and password, the system authenticates them by checking those credentials against stored records.

Authorization answers the question: “What are you allowed to do?” It determines what resources and actions an authenticated user can access. A regular user might view their own profile, while an admin can manage all users.

Aspect Authentication Authorization
Question Who are you? What can you do?
Purpose Verify identity Grant/deny access
When Before authorization After authentication
Example Login with username/password Admin-only endpoint access
Failure Response 401 Unauthorized 403 Forbidden
Data Credentials (password, token, biometrics) Roles, permissions, policies

1.2 Common Authentication Strategies

Strategy How It Works Best For Drawbacks
Session-Based Server stores session data, client holds session ID cookie Traditional web apps, server-rendered pages Stateful, hard to scale horizontally
Token-Based (JWT) Server issues signed token, client sends it with each request SPAs, mobile apps, microservices Token revocation is complex
API Key Client sends a pre-shared key in header or query param Server-to-server, third-party integrations No user context, key rotation challenges
OAuth2 Delegated auth via authorization server Third-party login (Google, GitHub) Complex implementation
Basic Auth Username:password in Authorization header (Base64) Simple internal tools, development Credentials sent with every request

1.3 Security Considerations

Security is not optional. Every authentication system must address these concerns from day one, not as an afterthought.
  • Always use HTTPS — Credentials and tokens must never travel over plain HTTP
  • Hash passwords — Never store plaintext passwords; use bcrypt or argon2
  • Use short-lived tokens — Access tokens should expire quickly (15-30 minutes)
  • Implement rate limiting — Prevent brute-force attacks on login endpoints
  • Validate all input — Sanitize and validate every piece of user input
  • Use secure headers — Set HSTS, CSP, X-Frame-Options, and other security headers
  • Log security events — Track login attempts, password changes, and permission changes
  • Follow the principle of least privilege — Grant minimum permissions required

2. Password Hashing

The foundation of any password-based authentication system is secure password hashing. You must never store passwords in plaintext. Instead, you hash them using a one-way cryptographic function that makes it computationally infeasible to recover the original password.

2.1 Installing Dependencies

pip install passlib[bcrypt] bcrypt

passlib is a comprehensive password hashing library that supports multiple algorithms. bcrypt is the recommended algorithm for password hashing because it is deliberately slow (making brute-force attacks expensive), includes a built-in salt, and has a configurable work factor that can be increased as hardware gets faster.

2.2 Setting Up the Password Context

# security/password.py
from passlib.context import CryptContext

# Create a password context with bcrypt as the default scheme
pwd_context = CryptContext(
    schemes=["bcrypt"],
    deprecated="auto",       # Automatically mark old schemes as deprecated
    bcrypt__rounds=12,       # Work factor (2^12 = 4096 iterations)
)


def hash_password(plain_password: str) -> str:
    """
    Hash a plaintext password using bcrypt.
    
    Args:
        plain_password: The user's plaintext password.
        
    Returns:
        The bcrypt hash string (includes algorithm, rounds, salt, and hash).
    """
    return pwd_context.hash(plain_password)


def verify_password(plain_password: str, hashed_password: str) -> bool:
    """
    Verify a plaintext password against a stored hash.
    
    Args:
        plain_password: The password to verify.
        hashed_password: The stored bcrypt hash.
        
    Returns:
        True if the password matches, False otherwise.
    """
    return pwd_context.verify(plain_password, hashed_password)

2.3 Understanding bcrypt Hashes

A bcrypt hash looks like this:

$2b$12$LJ3m4ys3Lk0TSwMvNCH/8.VkEm8MRzIrvMnGJOgLrMwOOzcnX3iOa
│  │  │                                                        │
│  │  │  └── Hash + Salt (53 characters)                        │
│  │  └── Cost factor (12 rounds = 2^12 iterations)             │
│  └── Sub-version (2b = current)                               │
└── Algorithm identifier ($2b = bcrypt)                         │

Key points about bcrypt:

  • Automatic salt — bcrypt generates a random 128-bit salt for each hash. You never need to manage salts manually.
  • Cost factor — The rounds parameter controls how slow hashing is. Each increment doubles the time. 12 rounds takes roughly 250ms on modern hardware.
  • Deterministic verification — The salt is embedded in the hash string, so verify() can extract it and re-hash the input for comparison.

2.4 Password Validation

# security/password_validation.py
import re
from pydantic import BaseModel, field_validator


class PasswordRequirements(BaseModel):
    """Validates password strength requirements."""
    password: str

    @field_validator("password")
    @classmethod
    def validate_password_strength(cls, v: str) -> str:
        if len(v) < 8:
            raise ValueError("Password must be at least 8 characters long")
        if len(v) > 128:
            raise ValueError("Password must not exceed 128 characters")
        if not re.search(r"[A-Z]", v):
            raise ValueError("Password must contain at least one uppercase letter")
        if not re.search(r"[a-z]", v):
            raise ValueError("Password must contain at least one lowercase letter")
        if not re.search(r"\d", v):
            raise ValueError("Password must contain at least one digit")
        if not re.search(r"[!@#$%^&*(),.?\":{}|<>]", v):
            raise ValueError(
                "Password must contain at least one special character"
            )
        return v


# Usage example
def validate_and_hash_password(plain_password: str) -> str:
    """Validate password strength, then hash it."""
    # This will raise ValidationError if password is weak
    PasswordRequirements(password=plain_password)
    return hash_password(plain_password)

2.5 Best Practices for Password Hashing

Practice Why
Use bcrypt or argon2 Purpose-built for passwords; deliberately slow
Never use MD5 or SHA-256 alone Too fast; vulnerable to brute-force and rainbow tables
Let the library handle salts bcrypt auto-generates cryptographically random salts
Set cost factor to at least 12 Balances security and performance; increase over time
Enforce password complexity Weak passwords undermine even the best hashing
Never log passwords Even hashed passwords should be treated as sensitive
Limit password length to 72 bytes bcrypt truncates input beyond 72 bytes

3. OAuth2 Password Flow

FastAPI has built-in support for OAuth2 flows. The OAuth2 “password flow” (also called “Resource Owner Password Credentials”) is the simplest OAuth2 flow where the user provides their username and password directly to your application, which then returns a token.

3.1 Understanding the Flow

The OAuth2 password flow works as follows:

  1. The client sends the user’s username and password to the /token endpoint
  2. The server validates the credentials
  3. If valid, the server returns an access token (and optionally a refresh token)
  4. The client includes the access token in the Authorization header for subsequent requests
  5. The server validates the token and processes the request

3.2 Setting Up OAuth2PasswordBearer

# security/oauth2.py
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm

# This tells FastAPI where the token endpoint is located.
# It also enables the "Authorize" button in Swagger UI.
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/token")


# This dependency extracts the token from the Authorization header
async def get_current_token(token: str = Depends(oauth2_scheme)) -> str:
    """
    Extract the bearer token from the Authorization header.
    
    The OAuth2PasswordBearer dependency automatically:
    1. Looks for the Authorization header
    2. Checks it starts with "Bearer "
    3. Extracts and returns the token string
    4. Returns 401 if the header is missing or malformed
    """
    return token

3.3 Creating the Token Endpoint

# routers/auth.py
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm

router = APIRouter(prefix="/api/v1/auth", tags=["Authentication"])


@router.post("/token")
async def login_for_access_token(
    form_data: OAuth2PasswordRequestForm = Depends()
):
    """
    OAuth2-compatible token endpoint.
    
    OAuth2PasswordRequestForm provides:
    - username: str (required)
    - password: str (required)
    - scope: str (optional, space-separated scopes)
    - grant_type: str (optional, must be "password" if provided)
    
    Note: OAuth2 spec requires form data (not JSON) for this endpoint.
    The Content-Type must be application/x-www-form-urlencoded.
    """
    # Authenticate the user (we will implement this fully later)
    user = authenticate_user(form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    
    # Create access token
    access_token = create_access_token(data={"sub": user.username})
    
    return {
        "access_token": access_token,
        "token_type": "bearer",
    }
Why form data? The OAuth2 specification requires the token endpoint to accept application/x-www-form-urlencoded content type. FastAPI’s OAuth2PasswordRequestForm handles this automatically. When testing with Swagger UI, the “Authorize” button sends credentials in this format.

3.4 Testing with Swagger UI

One of FastAPI’s best features is automatic OpenAPI documentation with built-in OAuth2 support. When you set tokenUrl in OAuth2PasswordBearer, Swagger UI adds an “Authorize” button that lets you log in and automatically includes the token in subsequent requests.

# main.py
from fastapi import FastAPI
from routers import auth

app = FastAPI(
    title="FastAPI Auth Tutorial",
    description="Complete authentication and authorization system",
    version="1.0.0",
)

app.include_router(auth.router)

# Visit http://localhost:8000/docs to see the Authorize button
# Click it, enter credentials, and all protected endpoints
# will automatically include the Bearer token

4. JWT Tokens

JSON Web Tokens (JWT) are the industry standard for stateless authentication in modern web applications. A JWT is a compact, URL-safe token that contains claims (statements about the user) and is cryptographically signed to prevent tampering.

4.1 JWT Structure

A JWT consists of three Base64URL-encoded parts separated by dots:

eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c
│                                          │                                                                          │
└── Header                                 └── Payload (Claims)                                                       └── Signature
Part Contains Example
Header Algorithm and token type {"alg": "HS256", "typ": "JWT"}
Payload Claims (user data, expiration, etc.) {"sub": "user123", "exp": 1700000000}
Signature HMAC or RSA signature for integrity Cryptographic hash of header + payload + secret

4.2 Standard JWT Claims

Claim Name Purpose
sub Subject User identifier (username, user ID)
exp Expiration When the token expires (Unix timestamp)
iat Issued At When the token was created
jti JWT ID Unique token identifier (for revocation)
iss Issuer Who issued the token
aud Audience Intended recipient
nbf Not Before Token is not valid before this time

4.3 Installing JWT Dependencies

pip install python-jose[cryptography]
# or alternatively:
pip install PyJWT

4.4 Creating and Verifying JWT Tokens

# security/jwt_handler.py
from datetime import datetime, timedelta, timezone
from typing import Any, Optional
import uuid

from jose import JWTError, jwt
from pydantic import BaseModel


# Configuration — In production, load these from environment variables
SECRET_KEY = "your-secret-key-change-in-production-use-openssl-rand-hex-32"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
REFRESH_TOKEN_EXPIRE_DAYS = 7


class TokenData(BaseModel):
    """Schema for decoded token data."""
    username: Optional[str] = None
    scopes: list[str] = []
    token_type: str = "access"
    jti: Optional[str] = None


class TokenResponse(BaseModel):
    """Schema for token endpoint response."""
    access_token: str
    refresh_token: str
    token_type: str = "bearer"
    expires_in: int  # seconds until access token expires


def create_access_token(
    data: dict[str, Any],
    expires_delta: Optional[timedelta] = None,
) -> str:
    """
    Create a JWT access token.
    
    Args:
        data: Claims to include in the token (must include 'sub').
        expires_delta: Custom expiration time. Defaults to 30 minutes.
    
    Returns:
        Encoded JWT string.
    """
    to_encode = data.copy()
    
    now = datetime.now(timezone.utc)
    expire = now + (expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
    
    to_encode.update({
        "exp": expire,
        "iat": now,
        "jti": str(uuid.uuid4()),  # Unique token ID for revocation
        "type": "access",
    })
    
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)


def create_refresh_token(
    data: dict[str, Any],
    expires_delta: Optional[timedelta] = None,
) -> str:
    """
    Create a JWT refresh token with longer expiration.
    
    Refresh tokens are used to obtain new access tokens without
    requiring the user to log in again.
    """
    to_encode = data.copy()
    
    now = datetime.now(timezone.utc)
    expire = now + (expires_delta or timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS))
    
    to_encode.update({
        "exp": expire,
        "iat": now,
        "jti": str(uuid.uuid4()),
        "type": "refresh",
    })
    
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)


def decode_token(token: str) -> TokenData:
    """
    Decode and validate a JWT token.
    
    Args:
        token: The JWT string to decode.
    
    Returns:
        TokenData with the decoded claims.
    
    Raises:
        JWTError: If the token is invalid, expired, or tampered with.
    """
    payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
    
    username: str = payload.get("sub")
    if username is None:
        raise JWTError("Token missing 'sub' claim")
    
    scopes: list[str] = payload.get("scopes", [])
    token_type: str = payload.get("type", "access")
    jti: str = payload.get("jti")
    
    return TokenData(
        username=username,
        scopes=scopes,
        token_type=token_type,
        jti=jti,
    )

4.5 Generating a Secure Secret Key

# Generate a cryptographically secure random key
openssl rand -hex 32
# Output: a1b2c3d4e5f6...  (64 hex characters = 256 bits)

# Or using Python
python -c "import secrets; print(secrets.token_hex(32))"
Never hardcode your secret key in source code. Use environment variables or a secrets manager (AWS Secrets Manager, HashiCorp Vault). If the secret key is compromised, an attacker can forge valid tokens for any user.

4.6 Token Configuration Best Practices

# config.py
from pydantic_settings import BaseSettings


class AuthSettings(BaseSettings):
    """
    Authentication configuration loaded from environment variables.
    
    Set these in your .env file or system environment:
        SECRET_KEY=your-secret-key
        ALGORITHM=HS256
        ACCESS_TOKEN_EXPIRE_MINUTES=30
        REFRESH_TOKEN_EXPIRE_DAYS=7
    """
    SECRET_KEY: str
    ALGORITHM: str = "HS256"
    ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
    REFRESH_TOKEN_EXPIRE_DAYS: int = 7
    
    # Password hashing
    BCRYPT_ROUNDS: int = 12
    
    # Rate limiting
    LOGIN_RATE_LIMIT: str = "5/minute"
    
    class Config:
        env_file = ".env"
        case_sensitive = True


auth_settings = AuthSettings()

5. User Model & Registration

With password hashing and JWT tokens in place, we need a user model to store user data and a registration endpoint to create new accounts. We will use SQLAlchemy for the database model and Pydantic for request/response schemas.

5.1 Installing Database Dependencies

pip install sqlalchemy asyncpg aiosqlite
# asyncpg for PostgreSQL, aiosqlite for SQLite (development)

5.2 Database Setup

# database.py
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase

DATABASE_URL = "sqlite+aiosqlite:///./auth_tutorial.db"
# For PostgreSQL: "postgresql+asyncpg://user:pass@localhost/dbname"

engine = create_async_engine(DATABASE_URL, echo=True)

async_session = async_sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False,
)


class Base(DeclarativeBase):
    """Base class for all SQLAlchemy models."""
    pass


async def get_db() -> AsyncSession:
    """Dependency that provides a database session."""
    async with async_session() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise
        finally:
            await session.close()


async def init_db():
    """Create all tables on application startup."""
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)

5.3 User Database Model

# models/user.py
import enum
from datetime import datetime, timezone

from sqlalchemy import (
    Boolean, Column, DateTime, Enum, Integer, String, Text, Index
)
from sqlalchemy.orm import relationship

from database import Base


class UserRole(str, enum.Enum):
    """User role enumeration for RBAC."""
    USER = "user"
    MODERATOR = "moderator"
    ADMIN = "admin"
    SUPER_ADMIN = "super_admin"


class User(Base):
    """
    User database model.
    
    Stores user credentials, profile information, and role assignments.
    """
    __tablename__ = "users"
    
    # Primary key
    id = Column(Integer, primary_key=True, index=True, autoincrement=True)
    
    # Authentication fields
    username = Column(String(50), unique=True, nullable=False, index=True)
    email = Column(String(255), unique=True, nullable=False, index=True)
    hashed_password = Column(String(255), nullable=False)
    
    # Profile fields
    full_name = Column(String(100), nullable=True)
    
    # Role and permissions
    role = Column(
        Enum(UserRole),
        default=UserRole.USER,
        nullable=False,
    )
    
    # Account status
    is_active = Column(Boolean, default=True, nullable=False)
    is_verified = Column(Boolean, default=False, nullable=False)
    
    # Timestamps
    created_at = Column(
        DateTime(timezone=True),
        default=lambda: datetime.now(timezone.utc),
        nullable=False,
    )
    updated_at = Column(
        DateTime(timezone=True),
        default=lambda: datetime.now(timezone.utc),
        onupdate=lambda: datetime.now(timezone.utc),
        nullable=False,
    )
    last_login = Column(DateTime(timezone=True), nullable=True)
    
    # Security fields
    failed_login_attempts = Column(Integer, default=0)
    locked_until = Column(DateTime(timezone=True), nullable=True)
    
    # API key for programmatic access
    api_key = Column(String(64), unique=True, nullable=True, index=True)
    
    # Relationships
    refresh_tokens = relationship(
        "RefreshToken", back_populates="user", cascade="all, delete-orphan"
    )
    
    # Table indexes
    __table_args__ = (
        Index("ix_users_email_active", "email", "is_active"),
    )
    
    def __repr__(self) -> str:
        return f"<User(id={self.id}, username='{self.username}', role='{self.role}')>"


class RefreshToken(Base):
    """
    Stores refresh tokens for token rotation and revocation.
    
    Each refresh token is stored in the database so it can be:
    - Revoked individually
    - Rotated (old token invalidated when new one is issued)
    - Cleaned up when expired
    """
    __tablename__ = "refresh_tokens"
    
    id = Column(Integer, primary_key=True, index=True, autoincrement=True)
    token_jti = Column(String(36), unique=True, nullable=False, index=True)
    user_id = Column(Integer, nullable=False, index=True)
    is_revoked = Column(Boolean, default=False, nullable=False)
    created_at = Column(
        DateTime(timezone=True),
        default=lambda: datetime.now(timezone.utc),
    )
    expires_at = Column(DateTime(timezone=True), nullable=False)
    
    # Relationship back to user
    user = relationship("User", back_populates="refresh_tokens")

5.4 User Pydantic Schemas

# schemas/user.py
import re
from datetime import datetime
from typing import Optional

from pydantic import BaseModel, EmailStr, field_validator


class UserCreate(BaseModel):
    """Schema for user registration requests."""
    username: str
    email: EmailStr
    password: str
    full_name: Optional[str] = None
    
    @field_validator("username")
    @classmethod
    def validate_username(cls, v: str) -> str:
        if len(v) < 3:
            raise ValueError("Username must be at least 3 characters")
        if len(v) > 50:
            raise ValueError("Username must not exceed 50 characters")
        if not re.match(r"^[a-zA-Z0-9_-]+$", v):
            raise ValueError(
                "Username can only contain letters, numbers, hyphens, "
                "and underscores"
            )
        return v.lower()  # Normalize to lowercase
    
    @field_validator("password")
    @classmethod
    def validate_password(cls, v: str) -> str:
        if len(v) < 8:
            raise ValueError("Password must be at least 8 characters")
        if not re.search(r"[A-Z]", v):
            raise ValueError("Password must contain an uppercase letter")
        if not re.search(r"[a-z]", v):
            raise ValueError("Password must contain a lowercase letter")
        if not re.search(r"\d", v):
            raise ValueError("Password must contain a digit")
        if not re.search(r"[!@#$%^&*(),.?\":{}|<>]", v):
            raise ValueError("Password must contain a special character")
        return v


class UserResponse(BaseModel):
    """Schema for user data in API responses (never includes password)."""
    id: int
    username: str
    email: str
    full_name: Optional[str] = None
    role: str
    is_active: bool
    is_verified: bool
    created_at: datetime
    
    model_config = {"from_attributes": True}


class UserUpdate(BaseModel):
    """Schema for updating user profile."""
    full_name: Optional[str] = None
    email: Optional[EmailStr] = None


class UserInDB(UserResponse):
    """Schema that includes the hashed password (for internal use only)."""
    hashed_password: str

5.5 User Registration Endpoint

# routers/users.py
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession

from database import get_db
from models.user import User
from schemas.user import UserCreate, UserResponse
from security.password import hash_password

router = APIRouter(prefix="/api/v1/users", tags=["Users"])


@router.post(
    "/register",
    response_model=UserResponse,
    status_code=status.HTTP_201_CREATED,
    summary="Register a new user",
)
async def register_user(
    user_data: UserCreate,
    db: AsyncSession = Depends(get_db),
):
    """
    Register a new user account.
    
    This endpoint:
    1. Validates the input (username format, email format, password strength)
    2. Checks for duplicate username and email
    3. Hashes the password
    4. Creates the user record
    5. Returns the user data (without password)
    """
    # Check for duplicate username
    result = await db.execute(
        select(User).where(User.username == user_data.username)
    )
    if result.scalar_one_or_none():
        raise HTTPException(
            status_code=status.HTTP_409_CONFLICT,
            detail="Username already registered",
        )
    
    # Check for duplicate email
    result = await db.execute(
        select(User).where(User.email == user_data.email)
    )
    if result.scalar_one_or_none():
        raise HTTPException(
            status_code=status.HTTP_409_CONFLICT,
            detail="Email already registered",
        )
    
    # Create user with hashed password
    new_user = User(
        username=user_data.username,
        email=user_data.email,
        hashed_password=hash_password(user_data.password),
        full_name=user_data.full_name,
    )
    
    db.add(new_user)
    await db.flush()       # Flush to get the auto-generated ID
    await db.refresh(new_user)  # Refresh to load all fields
    
    return new_user
Why 409 Conflict? We return a 409 status code for duplicate username/email instead of 400 (Bad Request) because the request is valid — the conflict is with existing data. This follows REST conventions and gives the client actionable information about why the request failed.

5.6 User Service Layer

# services/user_service.py
from datetime import datetime, timezone
from typing import Optional

from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession

from models.user import User
from security.password import hash_password, verify_password


class UserService:
    """
    Service layer for user-related business logic.
    
    Separating business logic from route handlers makes the code
    more testable and reusable.
    """
    
    def __init__(self, db: AsyncSession):
        self.db = db
    
    async def get_by_username(self, username: str) -> Optional[User]:
        """Find a user by username."""
        result = await self.db.execute(
            select(User).where(User.username == username)
        )
        return result.scalar_one_or_none()
    
    async def get_by_email(self, email: str) -> Optional[User]:
        """Find a user by email address."""
        result = await self.db.execute(
            select(User).where(User.email == email)
        )
        return result.scalar_one_or_none()
    
    async def get_by_id(self, user_id: int) -> Optional[User]:
        """Find a user by ID."""
        result = await self.db.execute(
            select(User).where(User.id == user_id)
        )
        return result.scalar_one_or_none()
    
    async def authenticate(
        self, username: str, password: str
    ) -> Optional[User]:
        """
        Authenticate a user with username and password.
        
        Returns the user if credentials are valid, None otherwise.
        Also handles account lockout after too many failed attempts.
        """
        user = await self.get_by_username(username)
        
        if not user:
            # Run password hash anyway to prevent timing attacks
            # (so the response time is the same whether user exists or not)
            hash_password("dummy-password")
            return None
        
        # Check if account is locked
        if user.locked_until and user.locked_until > datetime.now(timezone.utc):
            return None
        
        # Check if account is active
        if not user.is_active:
            return None
        
        # Verify password
        if not verify_password(password, user.hashed_password):
            # Increment failed attempts
            user.failed_login_attempts += 1
            
            # Lock account after 5 failed attempts (30 minute lockout)
            if user.failed_login_attempts >= 5:
                from datetime import timedelta
                user.locked_until = datetime.now(timezone.utc) + timedelta(minutes=30)
            
            await self.db.flush()
            return None
        
        # Successful login — reset failed attempts
        user.failed_login_attempts = 0
        user.locked_until = None
        user.last_login = datetime.now(timezone.utc)
        await self.db.flush()
        
        return user
    
    async def get_by_api_key(self, api_key: str) -> Optional[User]:
        """Find a user by API key."""
        result = await self.db.execute(
            select(User).where(
                User.api_key == api_key,
                User.is_active == True,
            )
        )
        return result.scalar_one_or_none()

6. Login System

Now let us bring everything together into a complete login system that authenticates users, generates JWT tokens, and returns them to the client.

6.1 Complete Login Endpoint

# routers/auth.py
from datetime import datetime, timedelta, timezone
from typing import Optional

from fastapi import APIRouter, Depends, HTTPException, status, Response
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.ext.asyncio import AsyncSession

from database import get_db
from models.user import RefreshToken
from schemas.user import UserResponse
from security.jwt_handler import (
    create_access_token,
    create_refresh_token,
    decode_token,
    TokenResponse,
    ACCESS_TOKEN_EXPIRE_MINUTES,
    REFRESH_TOKEN_EXPIRE_DAYS,
)
from services.user_service import UserService

router = APIRouter(prefix="/api/v1/auth", tags=["Authentication"])


@router.post(
    "/token",
    response_model=TokenResponse,
    summary="Login and get access + refresh tokens",
)
async def login(
    form_data: OAuth2PasswordRequestForm = Depends(),
    db: AsyncSession = Depends(get_db),
):
    """
    Authenticate user and return JWT tokens.
    
    This endpoint:
    1. Validates credentials against the database
    2. Creates a short-lived access token (30 min)
    3. Creates a long-lived refresh token (7 days)
    4. Stores the refresh token in the database for revocation
    5. Returns both tokens to the client
    """
    user_service = UserService(db)
    user = await user_service.authenticate(
        form_data.username, form_data.password
    )
    
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    
    # Build token claims
    token_data = {
        "sub": user.username,
        "role": user.role.value,
        "scopes": form_data.scopes,  # OAuth2 scopes if requested
    }
    
    # Create tokens
    access_token = create_access_token(data=token_data)
    refresh_token = create_refresh_token(data={"sub": user.username})
    
    # Decode refresh token to get its JTI and expiration
    refresh_data = decode_token(refresh_token)
    
    # Store refresh token in database for revocation tracking
    db_refresh_token = RefreshToken(
        token_jti=refresh_data.jti,
        user_id=user.id,
        expires_at=datetime.now(timezone.utc) + timedelta(
            days=REFRESH_TOKEN_EXPIRE_DAYS
        ),
    )
    db.add(db_refresh_token)
    
    return TokenResponse(
        access_token=access_token,
        refresh_token=refresh_token,
        token_type="bearer",
        expires_in=ACCESS_TOKEN_EXPIRE_MINUTES * 60,
    )


@router.post("/logout", summary="Logout and revoke refresh token")
async def logout(
    refresh_token: str,
    db: AsyncSession = Depends(get_db),
):
    """
    Revoke the user's refresh token.
    
    Since JWTs are stateless, we cannot truly invalidate an access token.
    Instead, we revoke the refresh token so no new access tokens can be
    obtained. The current access token will expire naturally.
    """
    try:
        token_data = decode_token(refresh_token)
    except Exception:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="Invalid refresh token",
        )
    
    # Find and revoke the refresh token in the database
    from sqlalchemy import select, update
    result = await db.execute(
        update(RefreshToken)
        .where(RefreshToken.token_jti == token_data.jti)
        .values(is_revoked=True)
    )
    
    if result.rowcount == 0:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="Refresh token not found",
        )
    
    return {"detail": "Successfully logged out"}

6.2 Token Storage on the Client Side

How and where you store tokens on the client side significantly impacts security. Here are the common approaches:

Storage Method Pros Cons Best For
HTTP-Only Cookie Not accessible via JavaScript (XSS-safe) Vulnerable to CSRF (mitigate with SameSite) Web applications
localStorage Easy to implement, persists across tabs Vulnerable to XSS attacks Low-security apps
sessionStorage Cleared when tab closes Vulnerable to XSS, lost on tab close Temporary sessions
In-memory variable Safest from storage attacks Lost on page refresh High-security SPAs
Recommendation: For web applications, store the access token in memory and the refresh token in an HTTP-only, Secure, SameSite=Strict cookie. This protects against both XSS (JavaScript cannot read HTTP-only cookies) and CSRF (SameSite prevents cross-origin requests).

6.3 Setting Tokens in HTTP-Only Cookies

# Alternative login endpoint that sets cookies instead of returning tokens

@router.post("/login", summary="Login with cookie-based token storage")
async def login_with_cookies(
    response: Response,
    form_data: OAuth2PasswordRequestForm = Depends(),
    db: AsyncSession = Depends(get_db),
):
    """Login and set tokens as HTTP-only cookies."""
    user_service = UserService(db)
    user = await user_service.authenticate(
        form_data.username, form_data.password
    )
    
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
        )
    
    access_token = create_access_token(data={"sub": user.username})
    refresh_token = create_refresh_token(data={"sub": user.username})
    
    # Set access token cookie
    response.set_cookie(
        key="access_token",
        value=f"Bearer {access_token}",
        httponly=True,       # JavaScript cannot access this cookie
        secure=True,         # Only sent over HTTPS
        samesite="strict",   # Not sent with cross-origin requests
        max_age=ACCESS_TOKEN_EXPIRE_MINUTES * 60,
        path="/",
    )
    
    # Set refresh token cookie
    response.set_cookie(
        key="refresh_token",
        value=refresh_token,
        httponly=True,
        secure=True,
        samesite="strict",
        max_age=REFRESH_TOKEN_EXPIRE_DAYS * 86400,
        path="/api/v1/auth/refresh",  # Only sent to refresh endpoint
    )
    
    return {"detail": "Login successful", "username": user.username}

6.4 Using Tokens with Python Requests

# Example client-side usage with the requests library
import requests

BASE_URL = "http://localhost:8000/api/v1"

# Step 1: Login to get tokens
login_response = requests.post(
    f"{BASE_URL}/auth/token",
    data={  # Note: form data, not JSON
        "username": "johndoe",
        "password": "SecurePass123!",
    },
)
tokens = login_response.json()
access_token = tokens["access_token"]
refresh_token = tokens["refresh_token"]

print(f"Access Token: {access_token[:20]}...")
print(f"Expires in: {tokens['expires_in']} seconds")

# Step 2: Access a protected endpoint
headers = {"Authorization": f"Bearer {access_token}"}
profile_response = requests.get(
    f"{BASE_URL}/users/me",
    headers=headers,
)
print(f"Profile: {profile_response.json()}")

# Step 3: Refresh the access token when it expires
refresh_response = requests.post(
    f"{BASE_URL}/auth/refresh",
    json={"refresh_token": refresh_token},
)
new_tokens = refresh_response.json()
print(f"New Access Token: {new_tokens['access_token'][:20]}...")

7. Protected Endpoints

The real power of FastAPI’s dependency injection system shines when protecting endpoints. We create reusable dependencies that validate tokens and retrieve the current user, then inject them into any endpoint that requires authentication.

7.1 The get_current_user Dependency

# security/dependencies.py
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError
from sqlalchemy.ext.asyncio import AsyncSession

from database import get_db
from models.user import User
from security.jwt_handler import decode_token
from services.user_service import UserService

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/token")


async def get_current_user(
    token: str = Depends(oauth2_scheme),
    db: AsyncSession = Depends(get_db),
) -> User:
    """
    Dependency that validates the JWT token and returns the current user.
    
    This dependency:
    1. Extracts the Bearer token from the Authorization header
    2. Decodes and validates the JWT (checks signature, expiration)
    3. Extracts the username from the 'sub' claim
    4. Looks up the user in the database
    5. Verifies the user account is active
    6. Returns the User object
    
    If any step fails, it raises a 401 Unauthorized error.
    
    Usage:
        @router.get("/protected")
        async def protected_route(user: User = Depends(get_current_user)):
            return {"message": f"Hello, {user.username}"}
    """
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    
    try:
        # Decode the JWT token
        token_data = decode_token(token)
        
        if token_data.username is None:
            raise credentials_exception
        
        # Ensure this is an access token, not a refresh token
        if token_data.token_type != "access":
            raise HTTPException(
                status_code=status.HTTP_401_UNAUTHORIZED,
                detail="Invalid token type. Use an access token.",
                headers={"WWW-Authenticate": "Bearer"},
            )
    except JWTError:
        raise credentials_exception
    
    # Look up the user in the database
    user_service = UserService(db)
    user = await user_service.get_by_username(token_data.username)
    
    if user is None:
        raise credentials_exception
    
    return user


async def get_current_active_user(
    current_user: User = Depends(get_current_user),
) -> User:
    """
    Dependency that ensures the user is active.
    
    Builds on get_current_user by adding an active check.
    Use this for most protected endpoints.
    """
    if not current_user.is_active:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Account is deactivated",
        )
    return current_user

7.2 Using Protected Endpoints

# routers/users.py
from fastapi import APIRouter, Depends
from models.user import User
from schemas.user import UserResponse, UserUpdate
from security.dependencies import get_current_active_user

router = APIRouter(prefix="/api/v1/users", tags=["Users"])


@router.get("/me", response_model=UserResponse, summary="Get current user profile")
async def get_my_profile(
    current_user: User = Depends(get_current_active_user),
):
    """
    Return the profile of the currently authenticated user.
    
    This endpoint requires a valid Bearer token in the Authorization header.
    The token is automatically validated by the get_current_active_user
    dependency, which also retrieves the full user object from the database.
    """
    return current_user


@router.put("/me", response_model=UserResponse, summary="Update current user profile")
async def update_my_profile(
    updates: UserUpdate,
    current_user: User = Depends(get_current_active_user),
    db: AsyncSession = Depends(get_db),
):
    """Update the authenticated user's profile."""
    if updates.full_name is not None:
        current_user.full_name = updates.full_name
    if updates.email is not None:
        # Check if new email is already taken
        existing = await UserService(db).get_by_email(updates.email)
        if existing and existing.id != current_user.id:
            raise HTTPException(
                status_code=status.HTTP_409_CONFLICT,
                detail="Email already in use",
            )
        current_user.email = updates.email
    
    await db.flush()
    await db.refresh(current_user)
    return current_user


@router.delete("/me", summary="Delete current user account")
async def delete_my_account(
    current_user: User = Depends(get_current_active_user),
    db: AsyncSession = Depends(get_db),
):
    """Soft-delete the authenticated user's account."""
    current_user.is_active = False
    await db.flush()
    return {"detail": "Account deactivated successfully"}

7.3 Dependency Chain Visualization

FastAPI’s dependency injection creates a clean chain of responsibilities. Understanding this chain helps you design your auth system:

# The dependency chain for a protected endpoint:
#
# HTTP Request
#   |
#   v
# OAuth2PasswordBearer  -->  Extracts "Bearer <token>" from header
#   |
#   v
# get_current_user      -->  Decodes JWT, looks up user in DB
#   |
#   v
# get_current_active_user -->  Checks user.is_active
#   |
#   v
# require_role("admin") -->  Checks user.role (RBAC - next section)
#   |
#   v
# Your endpoint handler  -->  Receives the validated User object


# Each dependency in the chain can:
# 1. Raise HTTPException to short-circuit the request
# 2. Pass data to the next dependency via return values
# 3. Access other dependencies (like the database session)

# Example: Stacking dependencies
@router.get("/admin/dashboard")
async def admin_dashboard(
    current_user: User = Depends(get_current_active_user),
    db: AsyncSession = Depends(get_db),
):
    """
    Both dependencies are resolved:
    - get_current_active_user validates the token AND checks is_active
    - get_db provides a database session
    
    FastAPI resolves the entire dependency tree automatically.
    """
    pass

7.4 Optional Authentication

# Sometimes you want endpoints that work for both anonymous and
# authenticated users (e.g., a public profile that shows extra
# data to the profile owner).

from fastapi.security import OAuth2PasswordBearer

# auto_error=False makes the dependency return None instead of
# raising 401 when no token is provided
oauth2_scheme_optional = OAuth2PasswordBearer(
    tokenUrl="/api/v1/auth/token",
    auto_error=False,
)


async def get_current_user_optional(
    token: str | None = Depends(oauth2_scheme_optional),
    db: AsyncSession = Depends(get_db),
) -> User | None:
    """
    Optional authentication dependency.
    
    Returns the User if a valid token is provided, None otherwise.
    Does NOT raise 401 for missing or invalid tokens.
    """
    if token is None:
        return None
    
    try:
        token_data = decode_token(token)
        user_service = UserService(db)
        return await user_service.get_by_username(token_data.username)
    except Exception:
        return None


# Usage
@router.get("/posts/{post_id}")
async def get_post(
    post_id: int,
    current_user: User | None = Depends(get_current_user_optional),
):
    """
    Public endpoint that shows extra data for authenticated users.
    """
    post = await get_post_by_id(post_id)
    response = {"title": post.title, "content": post.content}
    
    if current_user and current_user.id == post.author_id:
        response["edit_url"] = f"/posts/{post_id}/edit"
        response["analytics"] = await get_post_analytics(post_id)
    
    return response

8. Role-Based Access Control (RBAC)

Role-Based Access Control restricts system access based on the roles assigned to users. Instead of checking individual permissions for each user, you assign users to roles, and roles have predefined sets of permissions. This simplifies permission management significantly.

8.1 Defining Roles and Permissions

# security/rbac.py
import enum
from typing import Optional

from fastapi import Depends, HTTPException, status

from models.user import User, UserRole
from security.dependencies import get_current_active_user


class Permission(str, enum.Enum):
    """Fine-grained permissions for the application."""
    # User permissions
    READ_OWN_PROFILE = "read:own_profile"
    UPDATE_OWN_PROFILE = "update:own_profile"
    DELETE_OWN_ACCOUNT = "delete:own_account"
    
    # Post permissions
    CREATE_POST = "create:post"
    READ_POST = "read:post"
    UPDATE_OWN_POST = "update:own_post"
    DELETE_OWN_POST = "delete:own_post"
    
    # Admin permissions
    READ_ALL_USERS = "read:all_users"
    UPDATE_ANY_USER = "update:any_user"
    DELETE_ANY_USER = "delete:any_user"
    UPDATE_ANY_POST = "update:any_post"
    DELETE_ANY_POST = "delete:any_post"
    MANAGE_ROLES = "manage:roles"
    VIEW_AUDIT_LOG = "view:audit_log"
    
    # Super admin
    MANAGE_SYSTEM = "manage:system"


# Map roles to their permissions
ROLE_PERMISSIONS: dict[UserRole, set[Permission]] = {
    UserRole.USER: {
        Permission.READ_OWN_PROFILE,
        Permission.UPDATE_OWN_PROFILE,
        Permission.DELETE_OWN_ACCOUNT,
        Permission.CREATE_POST,
        Permission.READ_POST,
        Permission.UPDATE_OWN_POST,
        Permission.DELETE_OWN_POST,
    },
    UserRole.MODERATOR: {
        # Inherits all USER permissions plus:
        Permission.READ_OWN_PROFILE,
        Permission.UPDATE_OWN_PROFILE,
        Permission.DELETE_OWN_ACCOUNT,
        Permission.CREATE_POST,
        Permission.READ_POST,
        Permission.UPDATE_OWN_POST,
        Permission.DELETE_OWN_POST,
        # Moderator-specific
        Permission.UPDATE_ANY_POST,
        Permission.DELETE_ANY_POST,
        Permission.READ_ALL_USERS,
    },
    UserRole.ADMIN: {
        # Inherits all MODERATOR permissions plus:
        Permission.READ_OWN_PROFILE,
        Permission.UPDATE_OWN_PROFILE,
        Permission.DELETE_OWN_ACCOUNT,
        Permission.CREATE_POST,
        Permission.READ_POST,
        Permission.UPDATE_OWN_POST,
        Permission.DELETE_OWN_POST,
        Permission.UPDATE_ANY_POST,
        Permission.DELETE_ANY_POST,
        Permission.READ_ALL_USERS,
        # Admin-specific
        Permission.UPDATE_ANY_USER,
        Permission.DELETE_ANY_USER,
        Permission.MANAGE_ROLES,
        Permission.VIEW_AUDIT_LOG,
    },
    UserRole.SUPER_ADMIN: {
        perm for perm in Permission  # All permissions
    },
}

8.2 Role-Checking Dependencies

# security/rbac.py (continued)

def require_role(*allowed_roles: UserRole):
    """
    Dependency factory that restricts access to users with specific roles.
    
    Usage:
        @router.get("/admin/users")
        async def list_users(
            user: User = Depends(require_role(UserRole.ADMIN, UserRole.SUPER_ADMIN))
        ):
            ...
    """
    async def role_checker(
        current_user: User = Depends(get_current_active_user),
    ) -> User:
        if current_user.role not in allowed_roles:
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail=f"Insufficient permissions. Required role: "
                       f"{', '.join(r.value for r in allowed_roles)}",
            )
        return current_user
    
    return role_checker


def require_permission(*required_permissions: Permission):
    """
    Dependency factory that checks for specific permissions.
    
    More granular than role checking — checks if the user's role
    grants the required permissions.
    
    Usage:
        @router.delete("/posts/{post_id}")
        async def delete_post(
            post_id: int,
            user: User = Depends(require_permission(Permission.DELETE_ANY_POST))
        ):
            ...
    """
    async def permission_checker(
        current_user: User = Depends(get_current_active_user),
    ) -> User:
        user_permissions = ROLE_PERMISSIONS.get(current_user.role, set())
        
        missing = set(required_permissions) - user_permissions
        if missing:
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail=f"Missing permissions: "
                       f"{', '.join(p.value for p in missing)}",
            )
        return current_user
    
    return permission_checker

8.3 Using RBAC in Endpoints

# routers/admin.py
from fastapi import APIRouter, Depends
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession

from database import get_db
from models.user import User, UserRole
from schemas.user import UserResponse
from security.rbac import require_role, require_permission, Permission

router = APIRouter(prefix="/api/v1/admin", tags=["Admin"])


# Role-based access: only admins and super admins
@router.get(
    "/users",
    response_model=list[UserResponse],
    summary="List all users (admin only)",
)
async def list_all_users(
    skip: int = 0,
    limit: int = 100,
    current_user: User = Depends(
        require_role(UserRole.ADMIN, UserRole.SUPER_ADMIN)
    ),
    db: AsyncSession = Depends(get_db),
):
    """List all users. Requires ADMIN or SUPER_ADMIN role."""
    result = await db.execute(
        select(User).offset(skip).limit(limit)
    )
    return result.scalars().all()


# Permission-based access: more granular
@router.put(
    "/users/{user_id}/role",
    response_model=UserResponse,
    summary="Change a user's role",
)
async def change_user_role(
    user_id: int,
    new_role: UserRole,
    current_user: User = Depends(
        require_permission(Permission.MANAGE_ROLES)
    ),
    db: AsyncSession = Depends(get_db),
):
    """
    Change a user's role. Requires MANAGE_ROLES permission.
    
    Security rules:
    - Cannot change your own role
    - Cannot assign SUPER_ADMIN role (only direct DB change)
    - Cannot change the role of a SUPER_ADMIN
    """
    if user_id == current_user.id:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="Cannot change your own role",
        )
    
    if new_role == UserRole.SUPER_ADMIN:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="SUPER_ADMIN role can only be assigned via database",
        )
    
    user = await db.get(User, user_id)
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    
    if user.role == UserRole.SUPER_ADMIN:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Cannot modify a SUPER_ADMIN user",
        )
    
    user.role = new_role
    await db.flush()
    await db.refresh(user)
    return user


# Dashboard with statistics
@router.get("/dashboard", summary="Admin dashboard statistics")
async def admin_dashboard(
    current_user: User = Depends(
        require_role(UserRole.ADMIN, UserRole.SUPER_ADMIN)
    ),
    db: AsyncSession = Depends(get_db),
):
    """Get system statistics for the admin dashboard."""
    total_users = await db.scalar(select(func.count(User.id)))
    active_users = await db.scalar(
        select(func.count(User.id)).where(User.is_active == True)
    )
    
    role_counts = {}
    for role in UserRole:
        count = await db.scalar(
            select(func.count(User.id)).where(User.role == role)
        )
        role_counts[role.value] = count
    
    return {
        "total_users": total_users,
        "active_users": active_users,
        "inactive_users": total_users - active_users,
        "users_by_role": role_counts,
    }

8.4 Role Hierarchy Pattern

# A cleaner approach using role hierarchy
# Higher roles automatically inherit all lower role permissions

ROLE_HIERARCHY: dict[UserRole, int] = {
    UserRole.USER: 1,
    UserRole.MODERATOR: 2,
    UserRole.ADMIN: 3,
    UserRole.SUPER_ADMIN: 4,
}


def require_minimum_role(minimum_role: UserRole):
    """
    Require a minimum role level using hierarchy.
    
    A user with ADMIN role automatically passes a check for
    MODERATOR or USER level, because ADMIN is higher in the hierarchy.
    
    Usage:
        @router.get("/mod/reports")
        async def view_reports(
            user: User = Depends(require_minimum_role(UserRole.MODERATOR))
        ):
            ...  # Accessible by MODERATOR, ADMIN, and SUPER_ADMIN
    """
    required_level = ROLE_HIERARCHY[minimum_role]
    
    async def check_role(
        current_user: User = Depends(get_current_active_user),
    ) -> User:
        user_level = ROLE_HIERARCHY.get(current_user.role, 0)
        if user_level < required_level:
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail=f"Minimum role required: {minimum_role.value}",
            )
        return current_user
    
    return check_role

9. API Key Authentication

While JWT tokens are ideal for user-facing authentication, API keys are better suited for server-to-server communication, third-party integrations, and programmatic access. FastAPI provides built-in support for API key authentication via headers, query parameters, or cookies.

9.1 API Key Security Scheme

# security/api_key.py
import secrets
from datetime import datetime, timezone
from typing import Optional

from fastapi import Depends, HTTPException, Security, status
from fastapi.security import APIKeyHeader
from sqlalchemy.ext.asyncio import AsyncSession

from database import get_db
from models.user import User
from services.user_service import UserService

# Define where to look for the API key
api_key_header = APIKeyHeader(
    name="X-API-Key",
    auto_error=False,  # Return None instead of 403 if missing
)


def generate_api_key() -> str:
    """
    Generate a cryptographically secure API key.
    
    Format: prefix_randomhex
    The prefix makes it easy to identify and rotate keys.
    """
    return f"lmsc_{secrets.token_hex(32)}"


async def get_user_from_api_key(
    api_key: Optional[str] = Security(api_key_header),
    db: AsyncSession = Depends(get_db),
) -> Optional[User]:
    """
    Validate an API key and return the associated user.
    
    Returns None if no API key is provided (allowing fallback
    to other auth methods).
    """
    if api_key is None:
        return None
    
    user_service = UserService(db)
    user = await user_service.get_by_api_key(api_key)
    
    if user is None:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid API key",
        )
    
    if not user.is_active:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="API key owner account is deactivated",
        )
    
    return user

9.2 Combined Authentication (JWT or API Key)

# security/combined_auth.py
from fastapi import Depends, HTTPException, status
from models.user import User
from security.dependencies import get_current_user_optional
from security.api_key import get_user_from_api_key


async def get_current_user_flexible(
    jwt_user: User | None = Depends(get_current_user_optional),
    api_key_user: User | None = Depends(get_user_from_api_key),
) -> User:
    """
    Accept either JWT token OR API key authentication.
    
    This allows endpoints to work for both:
    - Browser users (JWT from login)
    - API clients (API key in header)
    
    JWT takes priority if both are provided.
    """
    user = jwt_user or api_key_user
    
    if user is None:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Authentication required. Provide a Bearer token or X-API-Key.",
            headers={"WWW-Authenticate": "Bearer"},
        )
    
    return user


# Usage
@router.get("/data")
async def get_data(
    current_user: User = Depends(get_current_user_flexible),
):
    """This endpoint accepts both JWT and API key auth."""
    return {"user": current_user.username, "data": "..."}

9.3 API Key Management Endpoints

# routers/api_keys.py
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession

from database import get_db
from models.user import User
from security.api_key import generate_api_key
from security.dependencies import get_current_active_user

router = APIRouter(prefix="/api/v1/api-keys", tags=["API Keys"])


@router.post("/generate", summary="Generate a new API key")
async def generate_new_api_key(
    current_user: User = Depends(get_current_active_user),
    db: AsyncSession = Depends(get_db),
):
    """
    Generate a new API key for the authenticated user.
    
    WARNING: The API key is only shown once. Store it securely.
    Generating a new key invalidates the previous one.
    """
    new_key = generate_api_key()
    current_user.api_key = new_key
    await db.flush()
    
    return {
        "api_key": new_key,
        "message": "Store this key securely. It will not be shown again.",
    }


@router.delete("/revoke", summary="Revoke current API key")
async def revoke_api_key(
    current_user: User = Depends(get_current_active_user),
    db: AsyncSession = Depends(get_db),
):
    """Revoke the current user's API key."""
    if current_user.api_key is None:
        raise HTTPException(
            status_code=status.HTTP_404_NOT_FOUND,
            detail="No API key to revoke",
        )
    
    current_user.api_key = None
    await db.flush()
    
    return {"detail": "API key revoked successfully"}

9.4 Rate Limiting per API Key

# security/rate_limit.py
import time
from collections import defaultdict
from typing import Optional

from fastapi import Depends, HTTPException, Request, status


class RateLimiter:
    """
    In-memory rate limiter using the sliding window algorithm.
    
    For production, use Redis-based rate limiting (e.g., with
    the slowapi library or a custom Redis implementation).
    """
    
    def __init__(self):
        # {key: [(timestamp, count), ...]}
        self.requests: dict[str, list[float]] = defaultdict(list)
    
    def is_rate_limited(
        self,
        key: str,
        max_requests: int,
        window_seconds: int,
    ) -> tuple[bool, dict]:
        """
        Check if a key has exceeded its rate limit.
        
        Returns:
            (is_limited, info_dict) where info_dict contains
            remaining requests and reset time.
        """
        now = time.time()
        window_start = now - window_seconds
        
        # Remove expired entries
        self.requests[key] = [
            ts for ts in self.requests[key]
            if ts > window_start
        ]
        
        current_count = len(self.requests[key])
        
        if current_count >= max_requests:
            reset_time = self.requests[key][0] + window_seconds
            return True, {
                "limit": max_requests,
                "remaining": 0,
                "reset": int(reset_time),
            }
        
        # Record this request
        self.requests[key].append(now)
        
        return False, {
            "limit": max_requests,
            "remaining": max_requests - current_count - 1,
            "reset": int(now + window_seconds),
        }


# Global rate limiter instance
rate_limiter = RateLimiter()


def rate_limit(max_requests: int = 60, window_seconds: int = 60):
    """
    Rate limiting dependency factory.
    
    Usage:
        @router.get("/data", dependencies=[Depends(rate_limit(100, 60))])
        async def get_data():
            ...
    """
    async def check_rate_limit(request: Request):
        # Use API key or IP address as the rate limit key
        api_key = request.headers.get("X-API-Key")
        client_key = api_key or request.client.host
        
        is_limited, info = rate_limiter.is_rate_limited(
            client_key, max_requests, window_seconds
        )
        
        if is_limited:
            raise HTTPException(
                status_code=status.HTTP_429_TOO_MANY_REQUESTS,
                detail="Rate limit exceeded",
                headers={
                    "X-RateLimit-Limit": str(info["limit"]),
                    "X-RateLimit-Remaining": str(info["remaining"]),
                    "X-RateLimit-Reset": str(info["reset"]),
                    "Retry-After": str(info["reset"] - int(time.time())),
                },
            )
    
    return check_rate_limit


# Usage example
@router.get(
    "/api/v1/search",
    dependencies=[Depends(rate_limit(max_requests=30, window_seconds=60))],
)
async def search(q: str):
    """Rate-limited search endpoint: 30 requests per minute."""
    return {"query": q, "results": []}

10. OAuth2 with Scopes

OAuth2 scopes provide a fine-grained permission system at the token level. Unlike role-based access which checks the user’s role, scopes define what a specific token is authorized to do. This is particularly useful when users want to grant limited access to third-party applications.

10.1 Defining OAuth2 Scopes

# security/scopes.py
from fastapi import Depends, HTTPException, Security, status
from fastapi.security import OAuth2PasswordBearer, SecurityScopes
from jose import JWTError

from security.jwt_handler import decode_token
from models.user import User
from services.user_service import UserService
from database import get_db
from sqlalchemy.ext.asyncio import AsyncSession

# Define available scopes with descriptions
# These appear in Swagger UI's Authorize dialog
OAUTH2_SCOPES = {
    "profile:read": "Read your profile information",
    "profile:write": "Update your profile information",
    "posts:read": "Read posts",
    "posts:write": "Create and edit posts",
    "posts:delete": "Delete posts",
    "users:read": "Read user information (admin)",
    "users:write": "Modify user accounts (admin)",
    "admin": "Full administrative access",
}

# OAuth2 scheme with scopes
oauth2_scheme_scoped = OAuth2PasswordBearer(
    tokenUrl="/api/v1/auth/token",
    scopes=OAUTH2_SCOPES,
)

10.2 Scope-Based User Dependency

# security/scopes.py (continued)

async def get_current_user_with_scopes(
    security_scopes: SecurityScopes,
    token: str = Depends(oauth2_scheme_scoped),
    db: AsyncSession = Depends(get_db),
) -> User:
    """
    Validate token AND check that it has the required scopes.
    
    FastAPI's SecurityScopes automatically collects the scopes
    required by the endpoint and all its dependencies.
    
    Args:
        security_scopes: Automatically populated by FastAPI with
            the scopes required by the endpoint chain.
        token: The JWT bearer token.
        db: Database session.
    
    Returns:
        The authenticated User if token is valid and has required scopes.
    """
    # Build the authenticate header value with required scopes
    if security_scopes.scopes:
        authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
    else:
        authenticate_value = "Bearer"
    
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": authenticate_value},
    )
    
    try:
        token_data = decode_token(token)
        if token_data.username is None:
            raise credentials_exception
    except JWTError:
        raise credentials_exception
    
    # Look up the user
    user_service = UserService(db)
    user = await user_service.get_by_username(token_data.username)
    
    if user is None:
        raise credentials_exception
    
    if not user.is_active:
        raise HTTPException(
            status_code=status.HTTP_403_FORBIDDEN,
            detail="Account is deactivated",
        )
    
    # Check that the token has all required scopes
    for scope in security_scopes.scopes:
        if scope not in token_data.scopes:
            raise HTTPException(
                status_code=status.HTTP_403_FORBIDDEN,
                detail=f"Token missing required scope: {scope}",
                headers={"WWW-Authenticate": authenticate_value},
            )
    
    return user

10.3 Using Scopes in Endpoints

# routers/posts.py
from fastapi import APIRouter, Depends, Security
from models.user import User
from security.scopes import get_current_user_with_scopes

router = APIRouter(prefix="/api/v1/posts", tags=["Posts"])


@router.get("/")
async def list_posts(
    current_user: User = Security(
        get_current_user_with_scopes,
        scopes=["posts:read"],
    ),
):
    """
    List all posts. Requires 'posts:read' scope.
    
    Note: We use Security() instead of Depends() when working with
    scopes. Security() is a subclass of Depends() that also passes
    the required scopes to the dependency.
    """
    return {"posts": [], "user": current_user.username}


@router.post("/")
async def create_post(
    title: str,
    content: str,
    current_user: User = Security(
        get_current_user_with_scopes,
        scopes=["posts:read", "posts:write"],
    ),
):
    """Create a post. Requires both 'posts:read' and 'posts:write' scopes."""
    return {
        "title": title,
        "content": content,
        "author": current_user.username,
    }


@router.delete("/{post_id}")
async def delete_post(
    post_id: int,
    current_user: User = Security(
        get_current_user_with_scopes,
        scopes=["posts:delete"],
    ),
):
    """Delete a post. Requires 'posts:delete' scope."""
    return {"detail": f"Post {post_id} deleted"}


# Admin endpoint requiring admin scope
@router.get("/admin/all")
async def admin_list_all_posts(
    current_user: User = Security(
        get_current_user_with_scopes,
        scopes=["admin"],
    ),
):
    """List all posts with admin details. Requires 'admin' scope."""
    return {"posts": [], "total": 0, "admin_view": True}

10.4 Issuing Tokens with Scopes

# Updated login endpoint that respects requested scopes

@router.post("/token")
async def login_with_scopes(
    form_data: OAuth2PasswordRequestForm = Depends(),
    db: AsyncSession = Depends(get_db),
):
    """
    Login endpoint that issues tokens with requested scopes.
    
    The client can request specific scopes during login.
    The server validates that the user is allowed to have
    those scopes based on their role.
    """
    user_service = UserService(db)
    user = await user_service.authenticate(
        form_data.username, form_data.password
    )
    
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
        )
    
    # Determine allowed scopes based on user role
    allowed_scopes = get_allowed_scopes_for_role(user.role)
    
    # Filter requested scopes to only include allowed ones
    requested_scopes = form_data.scopes  # list of scope strings
    granted_scopes = [s for s in requested_scopes if s in allowed_scopes]
    
    # If no scopes requested, grant default scopes for the role
    if not requested_scopes:
        granted_scopes = list(allowed_scopes)
    
    token_data = {
        "sub": user.username,
        "scopes": granted_scopes,
        "role": user.role.value,
    }
    
    access_token = create_access_token(data=token_data)
    refresh_token = create_refresh_token(data={"sub": user.username})
    
    return {
        "access_token": access_token,
        "refresh_token": refresh_token,
        "token_type": "bearer",
        "scope": " ".join(granted_scopes),  # OAuth2 spec: space-separated
    }


def get_allowed_scopes_for_role(role: UserRole) -> set[str]:
    """Map user roles to allowed OAuth2 scopes."""
    base_scopes = {"profile:read", "profile:write", "posts:read"}
    
    role_scope_map = {
        UserRole.USER: base_scopes | {"posts:write"},
        UserRole.MODERATOR: base_scopes | {
            "posts:write", "posts:delete", "users:read"
        },
        UserRole.ADMIN: base_scopes | {
            "posts:write", "posts:delete",
            "users:read", "users:write", "admin"
        },
        UserRole.SUPER_ADMIN: set(OAUTH2_SCOPES.keys()),  # All scopes
    }
    
    return role_scope_map.get(role, base_scopes)

11. Refresh Token Rotation

Refresh token rotation is a security technique where a new refresh token is issued every time the old one is used. This limits the window of vulnerability if a refresh token is compromised, because the stolen token becomes invalid after its first use.

11.1 How Refresh Token Rotation Works

  1. User logs in and receives an access token + refresh token (RT1)
  2. Access token expires after 30 minutes
  3. Client sends RT1 to the refresh endpoint
  4. Server validates RT1, marks it as used, issues new access token + new refresh token (RT2)
  5. RT1 is now invalid — if anyone tries to use RT1 again, it is a sign of token theft
  6. When RT1 reuse is detected, the server revokes ALL tokens in the family for safety

11.2 Implementing Token Rotation

# routers/auth.py — refresh endpoint with rotation

from datetime import datetime, timedelta, timezone
from pydantic import BaseModel
from sqlalchemy import select

from models.user import RefreshToken


class RefreshRequest(BaseModel):
    refresh_token: str


@router.post(
    "/refresh",
    response_model=TokenResponse,
    summary="Refresh access token with rotation",
)
async def refresh_access_token(
    request: RefreshRequest,
    db: AsyncSession = Depends(get_db),
):
    """
    Exchange a refresh token for a new access token + refresh token.
    
    Implements refresh token rotation:
    1. Validates the refresh token (signature, expiration, type)
    2. Checks if the token has been revoked
    3. Detects token reuse (potential theft)
    4. Issues new access + refresh tokens
    5. Revokes the old refresh token
    
    If token reuse is detected, ALL refresh tokens for the user
    are revoked as a security measure.
    """
    # Step 1: Decode and validate the refresh token
    try:
        token_data = decode_token(request.refresh_token)
    except JWTError as e:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail=f"Invalid refresh token: {str(e)}",
        )
    
    # Step 2: Ensure this is actually a refresh token
    if token_data.token_type != "refresh":
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid token type. Expected refresh token.",
        )
    
    # Step 3: Look up the refresh token in the database
    result = await db.execute(
        select(RefreshToken).where(
            RefreshToken.token_jti == token_data.jti
        )
    )
    stored_token = result.scalar_one_or_none()
    
    if stored_token is None:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Refresh token not found",
        )
    
    # Step 4: CRITICAL — Check for token reuse (theft detection)
    if stored_token.is_revoked:
        # This token was already used! Someone may have stolen it.
        # Revoke ALL refresh tokens for this user as a safety measure.
        await _revoke_all_user_tokens(stored_token.user_id, db)
        
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Token reuse detected. All sessions have been revoked. "
                   "Please log in again.",
        )
    
    # Step 5: Revoke the current refresh token (it is now used)
    stored_token.is_revoked = True
    
    # Step 6: Look up the user
    user_service = UserService(db)
    user = await user_service.get_by_id(stored_token.user_id)
    
    if not user or not user.is_active:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="User account not found or deactivated",
        )
    
    # Step 7: Issue new tokens
    new_token_data = {
        "sub": user.username,
        "role": user.role.value,
    }
    
    new_access_token = create_access_token(data=new_token_data)
    new_refresh_token = create_refresh_token(data={"sub": user.username})
    
    # Step 8: Store the new refresh token
    new_refresh_data = decode_token(new_refresh_token)
    db_new_token = RefreshToken(
        token_jti=new_refresh_data.jti,
        user_id=user.id,
        expires_at=datetime.now(timezone.utc) + timedelta(
            days=REFRESH_TOKEN_EXPIRE_DAYS
        ),
    )
    db.add(db_new_token)
    
    return TokenResponse(
        access_token=new_access_token,
        refresh_token=new_refresh_token,
        token_type="bearer",
        expires_in=ACCESS_TOKEN_EXPIRE_MINUTES * 60,
    )


async def _revoke_all_user_tokens(user_id: int, db: AsyncSession):
    """
    Revoke all refresh tokens for a user.
    
    Called when token reuse is detected as a security measure.
    This forces the user (and any attacker) to log in again.
    """
    from sqlalchemy import update
    await db.execute(
        update(RefreshToken)
        .where(
            RefreshToken.user_id == user_id,
            RefreshToken.is_revoked == False,
        )
        .values(is_revoked=True)
    )
    await db.flush()

11.3 Token Cleanup

# services/token_cleanup.py
from datetime import datetime, timezone

from sqlalchemy import delete, select
from sqlalchemy.ext.asyncio import AsyncSession

from models.user import RefreshToken


async def cleanup_expired_tokens(db: AsyncSession) -> int:
    """
    Remove expired and revoked refresh tokens from the database.
    
    Run this periodically (e.g., daily via a scheduled task)
    to keep the refresh_tokens table from growing indefinitely.
    
    Returns:
        Number of tokens removed.
    """
    now = datetime.now(timezone.utc)
    
    result = await db.execute(
        delete(RefreshToken).where(
            (RefreshToken.expires_at < now) |
            (RefreshToken.is_revoked == True)
        )
    )
    
    await db.commit()
    return result.rowcount


# Schedule cleanup on application startup
from contextlib import asynccontextmanager
from fastapi import FastAPI
import asyncio


@asynccontextmanager
async def lifespan(app: FastAPI):
    """Application lifespan manager with periodic token cleanup."""
    # Start background cleanup task
    cleanup_task = asyncio.create_task(periodic_cleanup())
    
    yield  # Application runs here
    
    # Shutdown: cancel the cleanup task
    cleanup_task.cancel()
    try:
        await cleanup_task
    except asyncio.CancelledError:
        pass


async def periodic_cleanup():
    """Run token cleanup every 24 hours."""
    while True:
        try:
            async with async_session() as db:
                removed = await cleanup_expired_tokens(db)
                print(f"Token cleanup: removed {removed} expired/revoked tokens")
        except Exception as e:
            print(f"Token cleanup error: {e}")
        
        await asyncio.sleep(86400)  # 24 hours


# Use the lifespan in your app
app = FastAPI(lifespan=lifespan)

11.4 Token Blacklisting (Alternative Approach)

# security/token_blacklist.py
"""
Token blacklisting for immediate access token revocation.

While refresh token rotation handles refresh tokens, sometimes
you need to immediately revoke an access token (e.g., when a user
changes their password or an admin disables an account).

For production, use Redis for O(1) lookups and automatic expiry:
    pip install redis
"""
from datetime import datetime, timezone
from typing import Optional

import redis.asyncio as redis

# Redis connection for token blacklist
redis_client = redis.Redis(host="localhost", port=6379, db=0)


async def blacklist_token(jti: str, expires_at: datetime) -> None:
    """
    Add a token's JTI to the blacklist.
    
    The entry automatically expires when the token would have expired,
    so the blacklist stays clean without manual cleanup.
    """
    ttl = int((expires_at - datetime.now(timezone.utc)).total_seconds())
    if ttl > 0:
        await redis_client.setex(f"blacklist:{jti}", ttl, "revoked")


async def is_token_blacklisted(jti: str) -> bool:
    """Check if a token has been blacklisted."""
    result = await redis_client.get(f"blacklist:{jti}")
    return result is not None


# Updated get_current_user with blacklist check
async def get_current_user_with_blacklist(
    token: str = Depends(oauth2_scheme),
    db: AsyncSession = Depends(get_db),
) -> User:
    """Validates token and checks blacklist."""
    try:
        token_data = decode_token(token)
    except JWTError:
        raise HTTPException(status_code=401, detail="Invalid token")
    
    # Check blacklist
    if token_data.jti and await is_token_blacklisted(token_data.jti):
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Token has been revoked",
        )
    
    user_service = UserService(db)
    user = await user_service.get_by_username(token_data.username)
    if not user or not user.is_active:
        raise HTTPException(status_code=401, detail="User not found")
    
    return user

12. Security Best Practices

A secure authentication system requires more than just correct logic. You need to configure transport security, prevent cross-origin attacks, rate-limit sensitive endpoints, sanitize input, and set appropriate security headers. This section covers the essential security hardening steps for a production FastAPI application.

12.1 HTTPS Configuration

# middleware/https_redirect.py
from fastapi import FastAPI, Request
from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware
from starlette.middleware.base import BaseHTTPMiddleware


def configure_https(app: FastAPI, environment: str = "production"):
    """
    Configure HTTPS enforcement.
    
    In production, all HTTP requests are redirected to HTTPS.
    In development, HTTPS is not enforced.
    """
    if environment == "production":
        # Redirect all HTTP requests to HTTPS
        app.add_middleware(HTTPSRedirectMiddleware)


class HSTSMiddleware(BaseHTTPMiddleware):
    """
    Add HTTP Strict Transport Security header.
    
    Tells browsers to only access the site over HTTPS for the
    specified duration, preventing protocol downgrade attacks.
    """
    async def dispatch(self, request: Request, call_next):
        response = await call_next(request)
        response.headers["Strict-Transport-Security"] = (
            "max-age=31536000; includeSubDomains; preload"
        )
        return response

12.2 CORS Configuration

# middleware/cors.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware


def configure_cors(app: FastAPI):
    """
    Configure Cross-Origin Resource Sharing.
    
    CORS controls which origins (domains) can make requests to your API.
    This is crucial for security when your frontend and backend are on
    different domains.
    """
    # Allowed origins — be specific, never use ["*"] in production
    allowed_origins = [
        "https://yourdomain.com",
        "https://app.yourdomain.com",
        "http://localhost:3000",  # React development server
        "http://localhost:5173",  # Vite development server
    ]
    
    app.add_middleware(
        CORSMiddleware,
        allow_origins=allowed_origins,
        allow_credentials=True,  # Allow cookies (for HTTP-only token cookies)
        allow_methods=["GET", "POST", "PUT", "DELETE", "PATCH"],
        allow_headers=[
            "Authorization",
            "Content-Type",
            "X-API-Key",
            "X-Request-ID",
        ],
        expose_headers=[
            "X-RateLimit-Limit",
            "X-RateLimit-Remaining",
            "X-RateLimit-Reset",
        ],
        max_age=600,  # Cache preflight requests for 10 minutes
    )
Never use allow_origins=["*"] with allow_credentials=True. This combination is explicitly forbidden by the CORS specification. If you need credentials support, you must list specific origins.

12.3 Security Headers Middleware

# middleware/security_headers.py
from starlette.middleware.base import BaseHTTPMiddleware
from fastapi import Request


class SecurityHeadersMiddleware(BaseHTTPMiddleware):
    """
    Add security headers to all responses.
    
    These headers protect against common web vulnerabilities:
    - XSS (Cross-Site Scripting)
    - Clickjacking
    - MIME-type sniffing
    - Information leakage
    """
    
    async def dispatch(self, request: Request, call_next):
        response = await call_next(request)
        
        # Prevent XSS: Don't execute scripts from inline sources
        response.headers["Content-Security-Policy"] = (
            "default-src 'self'; "
            "script-src 'self'; "
            "style-src 'self' 'unsafe-inline'; "
            "img-src 'self' data:; "
            "font-src 'self'; "
            "connect-src 'self'"
        )
        
        # Prevent clickjacking: Don't allow embedding in iframes
        response.headers["X-Frame-Options"] = "DENY"
        
        # Prevent MIME-type sniffing
        response.headers["X-Content-Type-Options"] = "nosniff"
        
        # Control referrer information
        response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
        
        # Enable browser XSS filter (legacy, but still useful)
        response.headers["X-XSS-Protection"] = "1; mode=block"
        
        # Control browser features
        response.headers["Permissions-Policy"] = (
            "camera=(), microphone=(), geolocation=(), "
            "payment=(), usb=()"
        )
        
        # Don't leak server information
        response.headers.pop("server", None)
        
        return response

12.4 Input Sanitization

# security/sanitization.py
import re
import html
from typing import Any

from pydantic import field_validator


def sanitize_string(value: str) -> str:
    """
    Sanitize a string input to prevent injection attacks.
    
    This function:
    1. Strips leading/trailing whitespace
    2. Escapes HTML entities (prevents XSS)
    3. Removes null bytes (prevents null byte injection)
    4. Limits length to prevent DoS
    """
    if not isinstance(value, str):
        return value
    
    # Strip whitespace
    value = value.strip()
    
    # Remove null bytes
    value = value.replace("\x00", "")
    
    # Escape HTML entities
    value = html.escape(value, quote=True)
    
    return value


def sanitize_search_query(query: str) -> str:
    """
    Sanitize search queries to prevent SQL injection and XSS.
    
    For SQL queries, always use parameterized queries (SQLAlchemy
    does this automatically). This function handles the display layer.
    """
    # Remove SQL special characters
    query = re.sub(r"[;'\"\-\-\/\*]", "", query)
    
    # Escape HTML
    query = html.escape(query)
    
    # Limit length
    return query[:500]


# Pydantic model with built-in sanitization
from pydantic import BaseModel


class SanitizedInput(BaseModel):
    """Base model that automatically sanitizes string fields."""
    
    @field_validator("*", mode="before")
    @classmethod
    def sanitize_strings(cls, v: Any) -> Any:
        if isinstance(v, str):
            return sanitize_string(v)
        return v


# Usage: inherit from SanitizedInput instead of BaseModel
class CommentCreate(SanitizedInput):
    content: str
    post_id: int
    
    # content will be automatically sanitized

12.5 Comprehensive Rate Limiting with slowapi

pip install slowapi
# middleware/rate_limiting.py
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from fastapi import FastAPI, Request


def get_rate_limit_key(request: Request) -> str:
    """
    Determine the rate limit key for a request.
    
    Uses the API key if present, otherwise falls back to IP address.
    This ensures API key users get their own rate limit bucket.
    """
    api_key = request.headers.get("X-API-Key")
    if api_key:
        return f"apikey:{api_key}"
    return get_remote_address(request)


# Create limiter instance
limiter = Limiter(key_func=get_rate_limit_key)


def configure_rate_limiting(app: FastAPI):
    """Configure rate limiting for the application."""
    app.state.limiter = limiter
    app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)


# Usage in endpoints
from slowapi import limiter as _limiter


@router.post("/auth/token")
@limiter.limit("5/minute")  # Strict limit on login attempts
async def login(request: Request, form_data: OAuth2PasswordRequestForm = Depends()):
    """Login with rate limiting: 5 attempts per minute."""
    ...


@router.post("/users/register")
@limiter.limit("3/hour")  # Very strict for registration
async def register(request: Request, user_data: UserCreate):
    """Registration with rate limiting: 3 per hour per IP."""
    ...


@router.get("/api/data")
@limiter.limit("100/minute")  # Higher limit for data endpoints
async def get_data(request: Request):
    """Data endpoint: 100 requests per minute."""
    ...

12.6 Audit Logging

# security/audit.py
import logging
from datetime import datetime, timezone
from enum import Enum
from typing import Optional

from fastapi import Request


class AuditEvent(str, Enum):
    """Types of security events to audit."""
    LOGIN_SUCCESS = "login_success"
    LOGIN_FAILURE = "login_failure"
    LOGOUT = "logout"
    TOKEN_REFRESH = "token_refresh"
    TOKEN_REVOKED = "token_revoked"
    TOKEN_REUSE_DETECTED = "token_reuse_detected"
    PASSWORD_CHANGED = "password_changed"
    ROLE_CHANGED = "role_changed"
    ACCOUNT_LOCKED = "account_locked"
    ACCOUNT_DEACTIVATED = "account_deactivated"
    API_KEY_GENERATED = "api_key_generated"
    API_KEY_REVOKED = "api_key_revoked"
    UNAUTHORIZED_ACCESS = "unauthorized_access"
    RATE_LIMIT_EXCEEDED = "rate_limit_exceeded"


# Configure audit logger
audit_logger = logging.getLogger("security.audit")
audit_logger.setLevel(logging.INFO)

# File handler for persistent audit log
handler = logging.FileHandler("security_audit.log")
handler.setFormatter(
    logging.Formatter(
        "%(asctime)s | %(levelname)s | %(message)s",
        datefmt="%Y-%m-%d %H:%M:%S",
    )
)
audit_logger.addHandler(handler)


def log_security_event(
    event: AuditEvent,
    request: Optional[Request] = None,
    user_id: Optional[int] = None,
    username: Optional[str] = None,
    details: Optional[str] = None,
):
    """
    Log a security event for auditing.
    
    In production, consider sending these to a SIEM system
    (Splunk, ELK, etc.) or a dedicated audit database.
    """
    client_ip = request.client.host if request else "unknown"
    user_agent = (
        request.headers.get("user-agent", "unknown")
        if request else "unknown"
    )
    
    log_entry = (
        f"event={event.value} | "
        f"ip={client_ip} | "
        f"user_id={user_id} | "
        f"username={username} | "
        f"user_agent={user_agent} | "
        f"details={details}"
    )
    
    # Use WARNING level for security-critical events
    if event in (
        AuditEvent.TOKEN_REUSE_DETECTED,
        AuditEvent.ACCOUNT_LOCKED,
        AuditEvent.UNAUTHORIZED_ACCESS,
        AuditEvent.LOGIN_FAILURE,
    ):
        audit_logger.warning(log_entry)
    else:
        audit_logger.info(log_entry)


# Usage in login endpoint
async def login(request: Request, ...):
    user = await authenticate(username, password)
    if not user:
        log_security_event(
            AuditEvent.LOGIN_FAILURE,
            request=request,
            username=username,
            details="Invalid credentials",
        )
        raise HTTPException(...)
    
    log_security_event(
        AuditEvent.LOGIN_SUCCESS,
        request=request,
        user_id=user.id,
        username=user.username,
    )
    ...

12.7 Security Checklist

Category Requirement Status
Transport HTTPS enforced with HSTS Required
Transport TLS 1.2+ only Required
Passwords bcrypt or argon2 hashing Required
Passwords Minimum 8 characters with complexity Required
Tokens Short-lived access tokens (15-30 min) Required
Tokens Refresh token rotation Recommended
Tokens Token blacklisting capability Recommended
Headers CORS properly configured Required
Headers Security headers set (CSP, X-Frame, etc.) Required
Rate Limiting Login endpoint rate limited Required
Rate Limiting Registration endpoint rate limited Required
Input All input validated and sanitized Required
Input Parameterized SQL queries Required
Logging Security events audited Required
Logging No sensitive data in logs Required
Account Account lockout after failed attempts Recommended
Account Email verification for new accounts Recommended

13. Complete Auth System

Now let us bring everything together into a complete, production-ready authentication system. This section combines all the concepts we have covered into a cohesive application structure that you can use as a template for your own projects.

13.1 Project Structure

fastapi-auth/
├── main.py                    # Application entry point
├── config.py                  # Configuration settings
├── database.py                # Database connection and session
├── requirements.txt           # Python dependencies
├── .env                       # Environment variables (not in git)
├── models/
│   ├── __init__.py
│   └── user.py               # User and RefreshToken models
├── schemas/
│   ├── __init__.py
│   ├── user.py               # User Pydantic schemas
│   └── auth.py               # Auth request/response schemas
├── routers/
│   ├── __init__.py
│   ├── auth.py               # Login, logout, refresh endpoints
│   ├── users.py              # User registration and profile
│   └── admin.py              # Admin-only endpoints
├── security/
│   ├── __init__.py
│   ├── password.py           # Password hashing
│   ├── jwt_handler.py        # JWT creation and validation
│   ├── dependencies.py       # Auth dependencies (get_current_user)
│   ├── rbac.py               # Role-based access control
│   ├── api_key.py            # API key authentication
│   ├── scopes.py             # OAuth2 scopes
│   └── audit.py              # Security audit logging
├── services/
│   ├── __init__.py
│   └── user_service.py       # User business logic
└── middleware/
    ├── __init__.py
    ├── cors.py               # CORS configuration
    ├── security_headers.py   # Security headers
    └── rate_limiting.py      # Rate limiting

13.2 Requirements File

# requirements.txt
fastapi==0.115.0
uvicorn[standard]==0.30.0
sqlalchemy[asyncio]==2.0.35
aiosqlite==0.20.0
python-jose[cryptography]==3.3.0
passlib[bcrypt]==1.7.4
bcrypt==4.2.0
python-multipart==0.0.9
pydantic[email]==2.9.0
pydantic-settings==2.5.0
slowapi==0.1.9
python-dotenv==1.0.1

13.3 Environment Configuration

# .env
SECRET_KEY=your-generated-secret-key-from-openssl-rand-hex-32
ALGORITHM=HS256
ACCESS_TOKEN_EXPIRE_MINUTES=30
REFRESH_TOKEN_EXPIRE_DAYS=7
DATABASE_URL=sqlite+aiosqlite:///./auth_app.db
ENVIRONMENT=development
BCRYPT_ROUNDS=12
# config.py
from pydantic_settings import BaseSettings


class Settings(BaseSettings):
    """Application settings loaded from environment variables."""
    
    # Application
    APP_NAME: str = "FastAPI Auth System"
    ENVIRONMENT: str = "development"
    DEBUG: bool = False
    
    # Database
    DATABASE_URL: str = "sqlite+aiosqlite:///./auth_app.db"
    
    # JWT
    SECRET_KEY: str
    ALGORITHM: str = "HS256"
    ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
    REFRESH_TOKEN_EXPIRE_DAYS: int = 7
    
    # Security
    BCRYPT_ROUNDS: int = 12
    MAX_LOGIN_ATTEMPTS: int = 5
    LOCKOUT_DURATION_MINUTES: int = 30
    
    # CORS
    ALLOWED_ORIGINS: list[str] = ["http://localhost:3000"]
    
    class Config:
        env_file = ".env"
        case_sensitive = True


settings = Settings()

13.4 Complete Database Setup

# database.py
from sqlalchemy.ext.asyncio import (
    AsyncSession,
    create_async_engine,
    async_sessionmaker,
)
from sqlalchemy.orm import DeclarativeBase

from config import settings

engine = create_async_engine(
    settings.DATABASE_URL,
    echo=settings.DEBUG,
    pool_pre_ping=True,          # Verify connections are alive
    pool_size=5,                  # Connection pool size
    max_overflow=10,              # Extra connections allowed
)

async_session = async_sessionmaker(
    engine,
    class_=AsyncSession,
    expire_on_commit=False,
)


class Base(DeclarativeBase):
    pass


async def get_db():
    """Database session dependency with automatic cleanup."""
    async with async_session() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise


async def init_db():
    """Create all database tables."""
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)

13.5 Complete Auth Schemas

# schemas/auth.py
from pydantic import BaseModel


class TokenResponse(BaseModel):
    """Response schema for token endpoints."""
    access_token: str
    refresh_token: str
    token_type: str = "bearer"
    expires_in: int
    scope: str = ""


class RefreshRequest(BaseModel):
    """Request schema for token refresh."""
    refresh_token: str


class PasswordChangeRequest(BaseModel):
    """Request schema for password changes."""
    current_password: str
    new_password: str


class MessageResponse(BaseModel):
    """Generic message response."""
    detail: str

13.6 Complete Auth Router

# routers/auth.py
from datetime import datetime, timedelta, timezone

from fastapi import APIRouter, Depends, HTTPException, Request, status
from fastapi.security import OAuth2PasswordRequestForm
from jose import JWTError
from sqlalchemy import select, update
from sqlalchemy.ext.asyncio import AsyncSession

from config import settings
from database import get_db
from models.user import RefreshToken, User
from schemas.auth import (
    MessageResponse,
    PasswordChangeRequest,
    RefreshRequest,
    TokenResponse,
)
from security.audit import AuditEvent, log_security_event
from security.dependencies import get_current_active_user
from security.jwt_handler import (
    create_access_token,
    create_refresh_token,
    decode_token,
)
from security.password import hash_password, verify_password
from services.user_service import UserService

router = APIRouter(prefix="/api/v1/auth", tags=["Authentication"])


@router.post("/token", response_model=TokenResponse)
async def login(
    request: Request,
    form_data: OAuth2PasswordRequestForm = Depends(),
    db: AsyncSession = Depends(get_db),
):
    """
    Authenticate user and return JWT access + refresh tokens.
    
    Accepts OAuth2 password flow (form data with username and password).
    """
    user_service = UserService(db)
    user = await user_service.authenticate(
        form_data.username, form_data.password
    )
    
    if not user:
        log_security_event(
            AuditEvent.LOGIN_FAILURE,
            request=request,
            username=form_data.username,
            details="Invalid credentials",
        )
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
            headers={"WWW-Authenticate": "Bearer"},
        )
    
    # Determine scopes based on role
    from security.scopes import get_allowed_scopes_for_role
    granted_scopes = list(get_allowed_scopes_for_role(user.role))
    
    # Create tokens
    token_data = {
        "sub": user.username,
        "role": user.role.value,
        "scopes": granted_scopes,
    }
    access_token = create_access_token(data=token_data)
    refresh_token = create_refresh_token(data={"sub": user.username})
    
    # Store refresh token
    refresh_data = decode_token(refresh_token)
    db_token = RefreshToken(
        token_jti=refresh_data.jti,
        user_id=user.id,
        expires_at=datetime.now(timezone.utc) + timedelta(
            days=settings.REFRESH_TOKEN_EXPIRE_DAYS
        ),
    )
    db.add(db_token)
    
    log_security_event(
        AuditEvent.LOGIN_SUCCESS,
        request=request,
        user_id=user.id,
        username=user.username,
    )
    
    return TokenResponse(
        access_token=access_token,
        refresh_token=refresh_token,
        expires_in=settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60,
        scope=" ".join(granted_scopes),
    )


@router.post("/refresh", response_model=TokenResponse)
async def refresh_token(
    request: Request,
    body: RefreshRequest,
    db: AsyncSession = Depends(get_db),
):
    """Refresh access token using refresh token rotation."""
    try:
        token_data = decode_token(body.refresh_token)
    except JWTError:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid refresh token",
        )
    
    if token_data.token_type != "refresh":
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Invalid token type",
        )
    
    # Find stored token
    result = await db.execute(
        select(RefreshToken).where(
            RefreshToken.token_jti == token_data.jti
        )
    )
    stored_token = result.scalar_one_or_none()
    
    if not stored_token:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Token not found",
        )
    
    # Detect reuse
    if stored_token.is_revoked:
        log_security_event(
            AuditEvent.TOKEN_REUSE_DETECTED,
            request=request,
            user_id=stored_token.user_id,
            details=f"Reused JTI: {token_data.jti}",
        )
        await db.execute(
            update(RefreshToken)
            .where(RefreshToken.user_id == stored_token.user_id)
            .values(is_revoked=True)
        )
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Token reuse detected. All sessions revoked.",
        )
    
    # Revoke old token
    stored_token.is_revoked = True
    
    # Get user
    user_service = UserService(db)
    user = await user_service.get_by_id(stored_token.user_id)
    if not user or not user.is_active:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="User not found or inactive",
        )
    
    # Issue new tokens
    from security.scopes import get_allowed_scopes_for_role
    granted_scopes = list(get_allowed_scopes_for_role(user.role))
    
    new_access = create_access_token(data={
        "sub": user.username,
        "role": user.role.value,
        "scopes": granted_scopes,
    })
    new_refresh = create_refresh_token(data={"sub": user.username})
    
    # Store new refresh token
    new_data = decode_token(new_refresh)
    db.add(RefreshToken(
        token_jti=new_data.jti,
        user_id=user.id,
        expires_at=datetime.now(timezone.utc) + timedelta(
            days=settings.REFRESH_TOKEN_EXPIRE_DAYS
        ),
    ))
    
    log_security_event(
        AuditEvent.TOKEN_REFRESH,
        request=request,
        user_id=user.id,
        username=user.username,
    )
    
    return TokenResponse(
        access_token=new_access,
        refresh_token=new_refresh,
        expires_in=settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60,
        scope=" ".join(granted_scopes),
    )


@router.post("/logout", response_model=MessageResponse)
async def logout(
    request: Request,
    body: RefreshRequest,
    current_user: User = Depends(get_current_active_user),
    db: AsyncSession = Depends(get_db),
):
    """Logout by revoking the refresh token."""
    try:
        token_data = decode_token(body.refresh_token)
    except JWTError:
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="Invalid token",
        )
    
    result = await db.execute(
        update(RefreshToken)
        .where(RefreshToken.token_jti == token_data.jti)
        .values(is_revoked=True)
    )
    
    log_security_event(
        AuditEvent.LOGOUT,
        request=request,
        user_id=current_user.id,
        username=current_user.username,
    )
    
    return MessageResponse(detail="Successfully logged out")


@router.post("/change-password", response_model=MessageResponse)
async def change_password(
    request: Request,
    body: PasswordChangeRequest,
    current_user: User = Depends(get_current_active_user),
    db: AsyncSession = Depends(get_db),
):
    """Change the authenticated user's password."""
    if not verify_password(body.current_password, current_user.hashed_password):
        raise HTTPException(
            status_code=status.HTTP_400_BAD_REQUEST,
            detail="Current password is incorrect",
        )
    
    current_user.hashed_password = hash_password(body.new_password)
    
    # Revoke all refresh tokens (force re-login on all devices)
    await db.execute(
        update(RefreshToken)
        .where(
            RefreshToken.user_id == current_user.id,
            RefreshToken.is_revoked == False,
        )
        .values(is_revoked=True)
    )
    
    log_security_event(
        AuditEvent.PASSWORD_CHANGED,
        request=request,
        user_id=current_user.id,
        username=current_user.username,
    )
    
    return MessageResponse(
        detail="Password changed. Please log in again on all devices."
    )

13.7 Complete Application Entry Point

# main.py
from contextlib import asynccontextmanager

from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware

from config import settings
from database import init_db
from middleware.security_headers import SecurityHeadersMiddleware
from routers import admin, auth, users


@asynccontextmanager
async def lifespan(app: FastAPI):
    """Application startup and shutdown events."""
    # Startup
    await init_db()
    print(f"Database initialized. Environment: {settings.ENVIRONMENT}")
    
    yield
    
    # Shutdown
    print("Application shutting down.")


app = FastAPI(
    title=settings.APP_NAME,
    description=(
        "Complete authentication and authorization system with JWT, "
        "refresh token rotation, RBAC, OAuth2 scopes, and API keys."
    ),
    version="1.0.0",
    lifespan=lifespan,
    docs_url="/docs" if settings.ENVIRONMENT != "production" else None,
    redoc_url="/redoc" if settings.ENVIRONMENT != "production" else None,
)

# Middleware (order matters — last added runs first)
app.add_middleware(SecurityHeadersMiddleware)
app.add_middleware(
    CORSMiddleware,
    allow_origins=settings.ALLOWED_ORIGINS,
    allow_credentials=True,
    allow_methods=["GET", "POST", "PUT", "DELETE", "PATCH"],
    allow_headers=["Authorization", "Content-Type", "X-API-Key"],
)

# Routers
app.include_router(auth.router)
app.include_router(users.router)
app.include_router(admin.router)


@app.get("/health")
async def health_check():
    """Health check endpoint for load balancers and monitoring."""
    return {"status": "healthy", "version": "1.0.0"}

13.8 Running the Application

# Install dependencies
pip install -r requirements.txt

# Generate a secret key
export SECRET_KEY=$(openssl rand -hex 32)

# Run the development server
uvicorn main:app --reload --host 0.0.0.0 --port 8000

# Open Swagger UI
# http://localhost:8000/docs

13.9 Testing the Complete System

# test_auth_system.py
"""
Integration tests for the complete auth system.
Run with: pytest test_auth_system.py -v
"""
import pytest
from httpx import AsyncClient, ASGITransport
from main import app


@pytest.fixture
async def client():
    """Create an async test client."""
    transport = ASGITransport(app=app)
    async with AsyncClient(transport=transport, base_url="http://test") as ac:
        yield ac


@pytest.fixture
async def registered_user(client: AsyncClient) -> dict:
    """Register a test user and return credentials."""
    user_data = {
        "username": "testuser",
        "email": "test@example.com",
        "password": "TestPass123!",
        "full_name": "Test User",
    }
    response = await client.post("/api/v1/users/register", json=user_data)
    assert response.status_code == 201
    return user_data


@pytest.fixture
async def auth_tokens(client: AsyncClient, registered_user: dict) -> dict:
    """Login and return access + refresh tokens."""
    response = await client.post(
        "/api/v1/auth/token",
        data={
            "username": registered_user["username"],
            "password": registered_user["password"],
        },
    )
    assert response.status_code == 200
    return response.json()


class TestRegistration:
    """Test user registration."""
    
    async def test_register_success(self, client: AsyncClient):
        response = await client.post(
            "/api/v1/users/register",
            json={
                "username": "newuser",
                "email": "new@example.com",
                "password": "SecurePass123!",
            },
        )
        assert response.status_code == 201
        data = response.json()
        assert data["username"] == "newuser"
        assert "hashed_password" not in data
    
    async def test_register_duplicate_username(
        self, client: AsyncClient, registered_user
    ):
        response = await client.post(
            "/api/v1/users/register",
            json={
                "username": registered_user["username"],
                "email": "different@example.com",
                "password": "SecurePass123!",
            },
        )
        assert response.status_code == 409
    
    async def test_register_weak_password(self, client: AsyncClient):
        response = await client.post(
            "/api/v1/users/register",
            json={
                "username": "weakuser",
                "email": "weak@example.com",
                "password": "weak",
            },
        )
        assert response.status_code == 422


class TestLogin:
    """Test login and token generation."""
    
    async def test_login_success(
        self, client: AsyncClient, registered_user
    ):
        response = await client.post(
            "/api/v1/auth/token",
            data={
                "username": registered_user["username"],
                "password": registered_user["password"],
            },
        )
        assert response.status_code == 200
        data = response.json()
        assert "access_token" in data
        assert "refresh_token" in data
        assert data["token_type"] == "bearer"
    
    async def test_login_wrong_password(
        self, client: AsyncClient, registered_user
    ):
        response = await client.post(
            "/api/v1/auth/token",
            data={
                "username": registered_user["username"],
                "password": "WrongPass123!",
            },
        )
        assert response.status_code == 401


class TestProtectedEndpoints:
    """Test protected endpoint access."""
    
    async def test_get_profile_authenticated(
        self, client: AsyncClient, auth_tokens
    ):
        response = await client.get(
            "/api/v1/users/me",
            headers={
                "Authorization": f"Bearer {auth_tokens['access_token']}"
            },
        )
        assert response.status_code == 200
    
    async def test_get_profile_no_token(self, client: AsyncClient):
        response = await client.get("/api/v1/users/me")
        assert response.status_code == 401
    
    async def test_get_profile_invalid_token(self, client: AsyncClient):
        response = await client.get(
            "/api/v1/users/me",
            headers={"Authorization": "Bearer invalid-token"},
        )
        assert response.status_code == 401


class TestTokenRefresh:
    """Test refresh token rotation."""
    
    async def test_refresh_success(
        self, client: AsyncClient, auth_tokens
    ):
        response = await client.post(
            "/api/v1/auth/refresh",
            json={"refresh_token": auth_tokens["refresh_token"]},
        )
        assert response.status_code == 200
        data = response.json()
        assert data["access_token"] != auth_tokens["access_token"]
        assert data["refresh_token"] != auth_tokens["refresh_token"]
    
    async def test_refresh_reuse_detection(
        self, client: AsyncClient, auth_tokens
    ):
        # First refresh: should succeed
        response1 = await client.post(
            "/api/v1/auth/refresh",
            json={"refresh_token": auth_tokens["refresh_token"]},
        )
        assert response1.status_code == 200
        
        # Second refresh with same token: should fail (reuse detected)
        response2 = await client.post(
            "/api/v1/auth/refresh",
            json={"refresh_token": auth_tokens["refresh_token"]},
        )
        assert response2.status_code == 401
        assert "reuse" in response2.json()["detail"].lower()

13.10 API Endpoint Summary

Method Endpoint Auth Description
POST /api/v1/users/register None Register a new user
POST /api/v1/auth/token None Login (get tokens)
POST /api/v1/auth/refresh None Refresh access token
POST /api/v1/auth/logout Bearer Revoke refresh token
POST /api/v1/auth/change-password Bearer Change password
GET /api/v1/users/me Bearer Get current user profile
PUT /api/v1/users/me Bearer Update profile
DELETE /api/v1/users/me Bearer Deactivate account
POST /api/v1/api-keys/generate Bearer Generate API key
DELETE /api/v1/api-keys/revoke Bearer Revoke API key
GET /api/v1/admin/users Admin List all users
PUT /api/v1/admin/users/{id}/role Admin Change user role
GET /api/v1/admin/dashboard Admin Admin statistics
GET /health None Health check

13.11 Authentication Flow Diagram

┌──────────┐     1. POST /register      ┌──────────┐
│          │ ──────────────────────────> │          │
│  Client  │ <────────────────────────── │  Server  │
│          │     User created (201)      │          │
│          │                             │          │
│          │     2. POST /token          │          │
│          │     (username + password)   │          │
│          │ ──────────────────────────> │          │
│          │ <────────────────────────── │          │
│          │     access_token +          │          │
│          │     refresh_token           │          │
│          │                             │          │
│          │     3. GET /users/me        │          │
│          │     Authorization: Bearer   │          │
│          │ ──────────────────────────> │          │
│          │ <────────────────────────── │          │
│          │     User profile (200)      │          │
│          │                             │          │
│          │     4. POST /refresh        │          │
│          │     (when token expires)    │          │
│          │ ──────────────────────────> │          │
│          │ <────────────────────────── │          │
│          │     new access_token +      │          │
│          │     new refresh_token       │          │
│          │                             │          │
│          │     5. POST /logout         │          │
│          │ ──────────────────────────> │          │
│          │ <────────────────────────── │          │
│          │     Tokens revoked (200)    │          │
└──────────┘                             └──────────┘

14. Key Takeaways

This tutorial covered a comprehensive set of authentication and authorization patterns in FastAPI. Here is a summary of the key concepts and techniques you learned:

Topic Key Concept Implementation
Authentication vs Authorization Authentication verifies identity; authorization controls access Return 401 for auth failures, 403 for permission failures
Password Hashing Never store plaintext passwords; use bcrypt with cost factor 12+ passlib.context.CryptContext with bcrypt scheme
OAuth2 Password Flow Standard flow for username/password login returning tokens OAuth2PasswordBearer + OAuth2PasswordRequestForm
JWT Tokens Stateless, signed tokens with claims (sub, exp, iat, jti) python-jose for creating and verifying tokens
User Registration Validate input, check duplicates, hash password, store user Pydantic validators + SQLAlchemy models
Login System Authenticate credentials, issue access + refresh tokens Token endpoint returning TokenResponse
Protected Endpoints Dependency injection validates tokens automatically Depends(get_current_active_user)
RBAC Roles map to permissions; check before granting access require_role() and require_permission() dependencies
API Keys Pre-shared keys for programmatic access APIKeyHeader + database lookup
OAuth2 Scopes Token-level permissions for fine-grained access control Security(dep, scopes=[...]) + SecurityScopes
Refresh Token Rotation New refresh token on each use; detect reuse as theft Database-tracked JTIs with revocation flags
Security Best Practices Defense in depth: HTTPS, CORS, headers, rate limiting, audit Middleware stack + security header configuration

Quick Reference: Essential Dependencies

# Import and use these in your endpoints:

from security.dependencies import (
    get_current_user,           # Basic: validates token, returns User
    get_current_active_user,    # + checks user.is_active
    get_current_user_optional,  # Returns User or None (no 401)
)
from security.rbac import (
    require_role,               # require_role(UserRole.ADMIN)
    require_minimum_role,       # require_minimum_role(UserRole.MODERATOR)
    require_permission,         # require_permission(Permission.DELETE_ANY_POST)
)
from security.scopes import (
    get_current_user_with_scopes,  # Security(dep, scopes=["posts:read"])
)
from security.api_key import (
    get_user_from_api_key,      # X-API-Key header auth
)
from security.combined_auth import (
    get_current_user_flexible,  # JWT OR API key
)

What to Learn Next

  • OAuth2 with External Providers — Integrate Google, GitHub, or Facebook login using libraries like authlib
  • Two-Factor Authentication (2FA) — Add TOTP-based 2FA using pyotp
  • Email Verification — Send verification emails with time-limited tokens
  • Password Reset Flow — Implement secure password reset via email
  • Session Management — View and revoke active sessions across devices
  • Advanced Rate Limiting — Redis-based distributed rate limiting with sliding windows
  • API Gateway Integration — Offload auth to Kong, AWS API Gateway, or Traefik
Remember: Security is not a feature — it is a requirement. Every endpoint that handles sensitive data must be properly authenticated and authorized. Test your auth system thoroughly, including edge cases like expired tokens, revoked accounts, and concurrent requests. When in doubt, deny access and log the event.



Subscribe To Our Newsletter
You will receive our latest post and tutorial.
Thank you for subscribing!

required
required


Leave a Reply

Your email address will not be published. Required fields are marked *