Security is one of the most critical aspects of any web application. A single vulnerability can expose user data, compromise accounts, and destroy user trust. FastAPI provides excellent built-in support for implementing authentication and authorization, leveraging Python’s type system and dependency injection to create secure, maintainable auth systems.
In this comprehensive tutorial, you will learn how to build a production-ready authentication and authorization system in FastAPI. We will cover everything from password hashing and JWT tokens to role-based access control, OAuth2 scopes, refresh token rotation, and security best practices. By the end, you will have a complete, reusable auth system that you can drop into any FastAPI project.
Before writing any code, it is essential to understand the distinction between authentication and authorization, and the common strategies used to implement them.
Authentication answers the question: “Who are you?” It is the process of verifying a user’s identity. When a user logs in with a username and password, the system authenticates them by checking those credentials against stored records.
Authorization answers the question: “What are you allowed to do?” It determines what resources and actions an authenticated user can access. A regular user might view their own profile, while an admin can manage all users.
| Aspect | Authentication | Authorization |
|---|---|---|
| Question | Who are you? | What can you do? |
| Purpose | Verify identity | Grant/deny access |
| When | Before authorization | After authentication |
| Example | Login with username/password | Admin-only endpoint access |
| Failure Response | 401 Unauthorized | 403 Forbidden |
| Data | Credentials (password, token, biometrics) | Roles, permissions, policies |
| Strategy | How It Works | Best For | Drawbacks |
|---|---|---|---|
| Session-Based | Server stores session data, client holds session ID cookie | Traditional web apps, server-rendered pages | Stateful, hard to scale horizontally |
| Token-Based (JWT) | Server issues signed token, client sends it with each request | SPAs, mobile apps, microservices | Token revocation is complex |
| API Key | Client sends a pre-shared key in header or query param | Server-to-server, third-party integrations | No user context, key rotation challenges |
| OAuth2 | Delegated auth via authorization server | Third-party login (Google, GitHub) | Complex implementation |
| Basic Auth | Username:password in Authorization header (Base64) | Simple internal tools, development | Credentials sent with every request |
The foundation of any password-based authentication system is secure password hashing. You must never store passwords in plaintext. Instead, you hash them using a one-way cryptographic function that makes it computationally infeasible to recover the original password.
pip install passlib[bcrypt] bcrypt
passlib is a comprehensive password hashing library that supports multiple algorithms. bcrypt is the recommended algorithm for password hashing because it is deliberately slow (making brute-force attacks expensive), includes a built-in salt, and has a configurable work factor that can be increased as hardware gets faster.
# security/password.py
from passlib.context import CryptContext
# Create a password context with bcrypt as the default scheme
pwd_context = CryptContext(
schemes=["bcrypt"],
deprecated="auto", # Automatically mark old schemes as deprecated
bcrypt__rounds=12, # Work factor (2^12 = 4096 iterations)
)
def hash_password(plain_password: str) -> str:
"""
Hash a plaintext password using bcrypt.
Args:
plain_password: The user's plaintext password.
Returns:
The bcrypt hash string (includes algorithm, rounds, salt, and hash).
"""
return pwd_context.hash(plain_password)
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""
Verify a plaintext password against a stored hash.
Args:
plain_password: The password to verify.
hashed_password: The stored bcrypt hash.
Returns:
True if the password matches, False otherwise.
"""
return pwd_context.verify(plain_password, hashed_password)
A bcrypt hash looks like this:
$2b$12$LJ3m4ys3Lk0TSwMvNCH/8.VkEm8MRzIrvMnGJOgLrMwOOzcnX3iOa │ │ │ │ │ │ │ └── Hash + Salt (53 characters) │ │ │ └── Cost factor (12 rounds = 2^12 iterations) │ │ └── Sub-version (2b = current) │ └── Algorithm identifier ($2b = bcrypt) │
Key points about bcrypt:
verify() can extract it and re-hash the input for comparison.
# security/password_validation.py
import re
from pydantic import BaseModel, field_validator
class PasswordRequirements(BaseModel):
"""Validates password strength requirements."""
password: str
@field_validator("password")
@classmethod
def validate_password_strength(cls, v: str) -> str:
if len(v) < 8:
raise ValueError("Password must be at least 8 characters long")
if len(v) > 128:
raise ValueError("Password must not exceed 128 characters")
if not re.search(r"[A-Z]", v):
raise ValueError("Password must contain at least one uppercase letter")
if not re.search(r"[a-z]", v):
raise ValueError("Password must contain at least one lowercase letter")
if not re.search(r"\d", v):
raise ValueError("Password must contain at least one digit")
if not re.search(r"[!@#$%^&*(),.?\":{}|<>]", v):
raise ValueError(
"Password must contain at least one special character"
)
return v
# Usage example
def validate_and_hash_password(plain_password: str) -> str:
"""Validate password strength, then hash it."""
# This will raise ValidationError if password is weak
PasswordRequirements(password=plain_password)
return hash_password(plain_password)
| Practice | Why |
|---|---|
| Use bcrypt or argon2 | Purpose-built for passwords; deliberately slow |
| Never use MD5 or SHA-256 alone | Too fast; vulnerable to brute-force and rainbow tables |
| Let the library handle salts | bcrypt auto-generates cryptographically random salts |
| Set cost factor to at least 12 | Balances security and performance; increase over time |
| Enforce password complexity | Weak passwords undermine even the best hashing |
| Never log passwords | Even hashed passwords should be treated as sensitive |
| Limit password length to 72 bytes | bcrypt truncates input beyond 72 bytes |
FastAPI has built-in support for OAuth2 flows. The OAuth2 “password flow” (also called “Resource Owner Password Credentials”) is the simplest OAuth2 flow where the user provides their username and password directly to your application, which then returns a token.
The OAuth2 password flow works as follows:
/token endpointAuthorization header for subsequent requests
# security/oauth2.py
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
# This tells FastAPI where the token endpoint is located.
# It also enables the "Authorize" button in Swagger UI.
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/token")
# This dependency extracts the token from the Authorization header
async def get_current_token(token: str = Depends(oauth2_scheme)) -> str:
"""
Extract the bearer token from the Authorization header.
The OAuth2PasswordBearer dependency automatically:
1. Looks for the Authorization header
2. Checks it starts with "Bearer "
3. Extracts and returns the token string
4. Returns 401 if the header is missing or malformed
"""
return token
# routers/auth.py
from fastapi import APIRouter, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordRequestForm
router = APIRouter(prefix="/api/v1/auth", tags=["Authentication"])
@router.post("/token")
async def login_for_access_token(
form_data: OAuth2PasswordRequestForm = Depends()
):
"""
OAuth2-compatible token endpoint.
OAuth2PasswordRequestForm provides:
- username: str (required)
- password: str (required)
- scope: str (optional, space-separated scopes)
- grant_type: str (optional, must be "password" if provided)
Note: OAuth2 spec requires form data (not JSON) for this endpoint.
The Content-Type must be application/x-www-form-urlencoded.
"""
# Authenticate the user (we will implement this fully later)
user = authenticate_user(form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
# Create access token
access_token = create_access_token(data={"sub": user.username})
return {
"access_token": access_token,
"token_type": "bearer",
}
application/x-www-form-urlencoded content type. FastAPI’s OAuth2PasswordRequestForm handles this automatically. When testing with Swagger UI, the “Authorize” button sends credentials in this format.
One of FastAPI’s best features is automatic OpenAPI documentation with built-in OAuth2 support. When you set tokenUrl in OAuth2PasswordBearer, Swagger UI adds an “Authorize” button that lets you log in and automatically includes the token in subsequent requests.
# main.py
from fastapi import FastAPI
from routers import auth
app = FastAPI(
title="FastAPI Auth Tutorial",
description="Complete authentication and authorization system",
version="1.0.0",
)
app.include_router(auth.router)
# Visit http://localhost:8000/docs to see the Authorize button
# Click it, enter credentials, and all protected endpoints
# will automatically include the Bearer token
JSON Web Tokens (JWT) are the industry standard for stateless authentication in modern web applications. A JWT is a compact, URL-safe token that contains claims (statements about the user) and is cryptographically signed to prevent tampering.
A JWT consists of three Base64URL-encoded parts separated by dots:
eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiIxMjM0NTY3ODkwIiwibmFtZSI6IkpvaG4gRG9lIiwiaWF0IjoxNTE2MjM5MDIyfQ.SflKxwRJSMeKKF2QT4fwpMeJf36POk6yJV_adQssw5c │ │ │ └── Header └── Payload (Claims) └── Signature
| Part | Contains | Example |
|---|---|---|
| Header | Algorithm and token type | {"alg": "HS256", "typ": "JWT"} |
| Payload | Claims (user data, expiration, etc.) | {"sub": "user123", "exp": 1700000000} |
| Signature | HMAC or RSA signature for integrity | Cryptographic hash of header + payload + secret |
| Claim | Name | Purpose |
|---|---|---|
sub |
Subject | User identifier (username, user ID) |
exp |
Expiration | When the token expires (Unix timestamp) |
iat |
Issued At | When the token was created |
jti |
JWT ID | Unique token identifier (for revocation) |
iss |
Issuer | Who issued the token |
aud |
Audience | Intended recipient |
nbf |
Not Before | Token is not valid before this time |
pip install python-jose[cryptography] # or alternatively: pip install PyJWT
# security/jwt_handler.py
from datetime import datetime, timedelta, timezone
from typing import Any, Optional
import uuid
from jose import JWTError, jwt
from pydantic import BaseModel
# Configuration — In production, load these from environment variables
SECRET_KEY = "your-secret-key-change-in-production-use-openssl-rand-hex-32"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
REFRESH_TOKEN_EXPIRE_DAYS = 7
class TokenData(BaseModel):
"""Schema for decoded token data."""
username: Optional[str] = None
scopes: list[str] = []
token_type: str = "access"
jti: Optional[str] = None
class TokenResponse(BaseModel):
"""Schema for token endpoint response."""
access_token: str
refresh_token: str
token_type: str = "bearer"
expires_in: int # seconds until access token expires
def create_access_token(
data: dict[str, Any],
expires_delta: Optional[timedelta] = None,
) -> str:
"""
Create a JWT access token.
Args:
data: Claims to include in the token (must include 'sub').
expires_delta: Custom expiration time. Defaults to 30 minutes.
Returns:
Encoded JWT string.
"""
to_encode = data.copy()
now = datetime.now(timezone.utc)
expire = now + (expires_delta or timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES))
to_encode.update({
"exp": expire,
"iat": now,
"jti": str(uuid.uuid4()), # Unique token ID for revocation
"type": "access",
})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def create_refresh_token(
data: dict[str, Any],
expires_delta: Optional[timedelta] = None,
) -> str:
"""
Create a JWT refresh token with longer expiration.
Refresh tokens are used to obtain new access tokens without
requiring the user to log in again.
"""
to_encode = data.copy()
now = datetime.now(timezone.utc)
expire = now + (expires_delta or timedelta(days=REFRESH_TOKEN_EXPIRE_DAYS))
to_encode.update({
"exp": expire,
"iat": now,
"jti": str(uuid.uuid4()),
"type": "refresh",
})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
def decode_token(token: str) -> TokenData:
"""
Decode and validate a JWT token.
Args:
token: The JWT string to decode.
Returns:
TokenData with the decoded claims.
Raises:
JWTError: If the token is invalid, expired, or tampered with.
"""
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise JWTError("Token missing 'sub' claim")
scopes: list[str] = payload.get("scopes", [])
token_type: str = payload.get("type", "access")
jti: str = payload.get("jti")
return TokenData(
username=username,
scopes=scopes,
token_type=token_type,
jti=jti,
)
# Generate a cryptographically secure random key openssl rand -hex 32 # Output: a1b2c3d4e5f6... (64 hex characters = 256 bits) # Or using Python python -c "import secrets; print(secrets.token_hex(32))"
# config.py
from pydantic_settings import BaseSettings
class AuthSettings(BaseSettings):
"""
Authentication configuration loaded from environment variables.
Set these in your .env file or system environment:
SECRET_KEY=your-secret-key
ALGORITHM=HS256
ACCESS_TOKEN_EXPIRE_MINUTES=30
REFRESH_TOKEN_EXPIRE_DAYS=7
"""
SECRET_KEY: str
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
REFRESH_TOKEN_EXPIRE_DAYS: int = 7
# Password hashing
BCRYPT_ROUNDS: int = 12
# Rate limiting
LOGIN_RATE_LIMIT: str = "5/minute"
class Config:
env_file = ".env"
case_sensitive = True
auth_settings = AuthSettings()
With password hashing and JWT tokens in place, we need a user model to store user data and a registration endpoint to create new accounts. We will use SQLAlchemy for the database model and Pydantic for request/response schemas.
pip install sqlalchemy asyncpg aiosqlite # asyncpg for PostgreSQL, aiosqlite for SQLite (development)
# database.py
from sqlalchemy.ext.asyncio import AsyncSession, create_async_engine, async_sessionmaker
from sqlalchemy.orm import DeclarativeBase
DATABASE_URL = "sqlite+aiosqlite:///./auth_tutorial.db"
# For PostgreSQL: "postgresql+asyncpg://user:pass@localhost/dbname"
engine = create_async_engine(DATABASE_URL, echo=True)
async_session = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
)
class Base(DeclarativeBase):
"""Base class for all SQLAlchemy models."""
pass
async def get_db() -> AsyncSession:
"""Dependency that provides a database session."""
async with async_session() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
finally:
await session.close()
async def init_db():
"""Create all tables on application startup."""
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
# models/user.py
import enum
from datetime import datetime, timezone
from sqlalchemy import (
Boolean, Column, DateTime, Enum, Integer, String, Text, Index
)
from sqlalchemy.orm import relationship
from database import Base
class UserRole(str, enum.Enum):
"""User role enumeration for RBAC."""
USER = "user"
MODERATOR = "moderator"
ADMIN = "admin"
SUPER_ADMIN = "super_admin"
class User(Base):
"""
User database model.
Stores user credentials, profile information, and role assignments.
"""
__tablename__ = "users"
# Primary key
id = Column(Integer, primary_key=True, index=True, autoincrement=True)
# Authentication fields
username = Column(String(50), unique=True, nullable=False, index=True)
email = Column(String(255), unique=True, nullable=False, index=True)
hashed_password = Column(String(255), nullable=False)
# Profile fields
full_name = Column(String(100), nullable=True)
# Role and permissions
role = Column(
Enum(UserRole),
default=UserRole.USER,
nullable=False,
)
# Account status
is_active = Column(Boolean, default=True, nullable=False)
is_verified = Column(Boolean, default=False, nullable=False)
# Timestamps
created_at = Column(
DateTime(timezone=True),
default=lambda: datetime.now(timezone.utc),
nullable=False,
)
updated_at = Column(
DateTime(timezone=True),
default=lambda: datetime.now(timezone.utc),
onupdate=lambda: datetime.now(timezone.utc),
nullable=False,
)
last_login = Column(DateTime(timezone=True), nullable=True)
# Security fields
failed_login_attempts = Column(Integer, default=0)
locked_until = Column(DateTime(timezone=True), nullable=True)
# API key for programmatic access
api_key = Column(String(64), unique=True, nullable=True, index=True)
# Relationships
refresh_tokens = relationship(
"RefreshToken", back_populates="user", cascade="all, delete-orphan"
)
# Table indexes
__table_args__ = (
Index("ix_users_email_active", "email", "is_active"),
)
def __repr__(self) -> str:
return f"<User(id={self.id}, username='{self.username}', role='{self.role}')>"
class RefreshToken(Base):
"""
Stores refresh tokens for token rotation and revocation.
Each refresh token is stored in the database so it can be:
- Revoked individually
- Rotated (old token invalidated when new one is issued)
- Cleaned up when expired
"""
__tablename__ = "refresh_tokens"
id = Column(Integer, primary_key=True, index=True, autoincrement=True)
token_jti = Column(String(36), unique=True, nullable=False, index=True)
user_id = Column(Integer, nullable=False, index=True)
is_revoked = Column(Boolean, default=False, nullable=False)
created_at = Column(
DateTime(timezone=True),
default=lambda: datetime.now(timezone.utc),
)
expires_at = Column(DateTime(timezone=True), nullable=False)
# Relationship back to user
user = relationship("User", back_populates="refresh_tokens")
# schemas/user.py
import re
from datetime import datetime
from typing import Optional
from pydantic import BaseModel, EmailStr, field_validator
class UserCreate(BaseModel):
"""Schema for user registration requests."""
username: str
email: EmailStr
password: str
full_name: Optional[str] = None
@field_validator("username")
@classmethod
def validate_username(cls, v: str) -> str:
if len(v) < 3:
raise ValueError("Username must be at least 3 characters")
if len(v) > 50:
raise ValueError("Username must not exceed 50 characters")
if not re.match(r"^[a-zA-Z0-9_-]+$", v):
raise ValueError(
"Username can only contain letters, numbers, hyphens, "
"and underscores"
)
return v.lower() # Normalize to lowercase
@field_validator("password")
@classmethod
def validate_password(cls, v: str) -> str:
if len(v) < 8:
raise ValueError("Password must be at least 8 characters")
if not re.search(r"[A-Z]", v):
raise ValueError("Password must contain an uppercase letter")
if not re.search(r"[a-z]", v):
raise ValueError("Password must contain a lowercase letter")
if not re.search(r"\d", v):
raise ValueError("Password must contain a digit")
if not re.search(r"[!@#$%^&*(),.?\":{}|<>]", v):
raise ValueError("Password must contain a special character")
return v
class UserResponse(BaseModel):
"""Schema for user data in API responses (never includes password)."""
id: int
username: str
email: str
full_name: Optional[str] = None
role: str
is_active: bool
is_verified: bool
created_at: datetime
model_config = {"from_attributes": True}
class UserUpdate(BaseModel):
"""Schema for updating user profile."""
full_name: Optional[str] = None
email: Optional[EmailStr] = None
class UserInDB(UserResponse):
"""Schema that includes the hashed password (for internal use only)."""
hashed_password: str
# routers/users.py
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from database import get_db
from models.user import User
from schemas.user import UserCreate, UserResponse
from security.password import hash_password
router = APIRouter(prefix="/api/v1/users", tags=["Users"])
@router.post(
"/register",
response_model=UserResponse,
status_code=status.HTTP_201_CREATED,
summary="Register a new user",
)
async def register_user(
user_data: UserCreate,
db: AsyncSession = Depends(get_db),
):
"""
Register a new user account.
This endpoint:
1. Validates the input (username format, email format, password strength)
2. Checks for duplicate username and email
3. Hashes the password
4. Creates the user record
5. Returns the user data (without password)
"""
# Check for duplicate username
result = await db.execute(
select(User).where(User.username == user_data.username)
)
if result.scalar_one_or_none():
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Username already registered",
)
# Check for duplicate email
result = await db.execute(
select(User).where(User.email == user_data.email)
)
if result.scalar_one_or_none():
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Email already registered",
)
# Create user with hashed password
new_user = User(
username=user_data.username,
email=user_data.email,
hashed_password=hash_password(user_data.password),
full_name=user_data.full_name,
)
db.add(new_user)
await db.flush() # Flush to get the auto-generated ID
await db.refresh(new_user) # Refresh to load all fields
return new_user
# services/user_service.py
from datetime import datetime, timezone
from typing import Optional
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession
from models.user import User
from security.password import hash_password, verify_password
class UserService:
"""
Service layer for user-related business logic.
Separating business logic from route handlers makes the code
more testable and reusable.
"""
def __init__(self, db: AsyncSession):
self.db = db
async def get_by_username(self, username: str) -> Optional[User]:
"""Find a user by username."""
result = await self.db.execute(
select(User).where(User.username == username)
)
return result.scalar_one_or_none()
async def get_by_email(self, email: str) -> Optional[User]:
"""Find a user by email address."""
result = await self.db.execute(
select(User).where(User.email == email)
)
return result.scalar_one_or_none()
async def get_by_id(self, user_id: int) -> Optional[User]:
"""Find a user by ID."""
result = await self.db.execute(
select(User).where(User.id == user_id)
)
return result.scalar_one_or_none()
async def authenticate(
self, username: str, password: str
) -> Optional[User]:
"""
Authenticate a user with username and password.
Returns the user if credentials are valid, None otherwise.
Also handles account lockout after too many failed attempts.
"""
user = await self.get_by_username(username)
if not user:
# Run password hash anyway to prevent timing attacks
# (so the response time is the same whether user exists or not)
hash_password("dummy-password")
return None
# Check if account is locked
if user.locked_until and user.locked_until > datetime.now(timezone.utc):
return None
# Check if account is active
if not user.is_active:
return None
# Verify password
if not verify_password(password, user.hashed_password):
# Increment failed attempts
user.failed_login_attempts += 1
# Lock account after 5 failed attempts (30 minute lockout)
if user.failed_login_attempts >= 5:
from datetime import timedelta
user.locked_until = datetime.now(timezone.utc) + timedelta(minutes=30)
await self.db.flush()
return None
# Successful login — reset failed attempts
user.failed_login_attempts = 0
user.locked_until = None
user.last_login = datetime.now(timezone.utc)
await self.db.flush()
return user
async def get_by_api_key(self, api_key: str) -> Optional[User]:
"""Find a user by API key."""
result = await self.db.execute(
select(User).where(
User.api_key == api_key,
User.is_active == True,
)
)
return result.scalar_one_or_none()
Now let us bring everything together into a complete login system that authenticates users, generates JWT tokens, and returns them to the client.
# routers/auth.py
from datetime import datetime, timedelta, timezone
from typing import Optional
from fastapi import APIRouter, Depends, HTTPException, status, Response
from fastapi.security import OAuth2PasswordRequestForm
from sqlalchemy.ext.asyncio import AsyncSession
from database import get_db
from models.user import RefreshToken
from schemas.user import UserResponse
from security.jwt_handler import (
create_access_token,
create_refresh_token,
decode_token,
TokenResponse,
ACCESS_TOKEN_EXPIRE_MINUTES,
REFRESH_TOKEN_EXPIRE_DAYS,
)
from services.user_service import UserService
router = APIRouter(prefix="/api/v1/auth", tags=["Authentication"])
@router.post(
"/token",
response_model=TokenResponse,
summary="Login and get access + refresh tokens",
)
async def login(
form_data: OAuth2PasswordRequestForm = Depends(),
db: AsyncSession = Depends(get_db),
):
"""
Authenticate user and return JWT tokens.
This endpoint:
1. Validates credentials against the database
2. Creates a short-lived access token (30 min)
3. Creates a long-lived refresh token (7 days)
4. Stores the refresh token in the database for revocation
5. Returns both tokens to the client
"""
user_service = UserService(db)
user = await user_service.authenticate(
form_data.username, form_data.password
)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
# Build token claims
token_data = {
"sub": user.username,
"role": user.role.value,
"scopes": form_data.scopes, # OAuth2 scopes if requested
}
# Create tokens
access_token = create_access_token(data=token_data)
refresh_token = create_refresh_token(data={"sub": user.username})
# Decode refresh token to get its JTI and expiration
refresh_data = decode_token(refresh_token)
# Store refresh token in database for revocation tracking
db_refresh_token = RefreshToken(
token_jti=refresh_data.jti,
user_id=user.id,
expires_at=datetime.now(timezone.utc) + timedelta(
days=REFRESH_TOKEN_EXPIRE_DAYS
),
)
db.add(db_refresh_token)
return TokenResponse(
access_token=access_token,
refresh_token=refresh_token,
token_type="bearer",
expires_in=ACCESS_TOKEN_EXPIRE_MINUTES * 60,
)
@router.post("/logout", summary="Logout and revoke refresh token")
async def logout(
refresh_token: str,
db: AsyncSession = Depends(get_db),
):
"""
Revoke the user's refresh token.
Since JWTs are stateless, we cannot truly invalidate an access token.
Instead, we revoke the refresh token so no new access tokens can be
obtained. The current access token will expire naturally.
"""
try:
token_data = decode_token(refresh_token)
except Exception:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid refresh token",
)
# Find and revoke the refresh token in the database
from sqlalchemy import select, update
result = await db.execute(
update(RefreshToken)
.where(RefreshToken.token_jti == token_data.jti)
.values(is_revoked=True)
)
if result.rowcount == 0:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Refresh token not found",
)
return {"detail": "Successfully logged out"}
How and where you store tokens on the client side significantly impacts security. Here are the common approaches:
| Storage Method | Pros | Cons | Best For |
|---|---|---|---|
| HTTP-Only Cookie | Not accessible via JavaScript (XSS-safe) | Vulnerable to CSRF (mitigate with SameSite) | Web applications |
| localStorage | Easy to implement, persists across tabs | Vulnerable to XSS attacks | Low-security apps |
| sessionStorage | Cleared when tab closes | Vulnerable to XSS, lost on tab close | Temporary sessions |
| In-memory variable | Safest from storage attacks | Lost on page refresh | High-security SPAs |
# Alternative login endpoint that sets cookies instead of returning tokens
@router.post("/login", summary="Login with cookie-based token storage")
async def login_with_cookies(
response: Response,
form_data: OAuth2PasswordRequestForm = Depends(),
db: AsyncSession = Depends(get_db),
):
"""Login and set tokens as HTTP-only cookies."""
user_service = UserService(db)
user = await user_service.authenticate(
form_data.username, form_data.password
)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
)
access_token = create_access_token(data={"sub": user.username})
refresh_token = create_refresh_token(data={"sub": user.username})
# Set access token cookie
response.set_cookie(
key="access_token",
value=f"Bearer {access_token}",
httponly=True, # JavaScript cannot access this cookie
secure=True, # Only sent over HTTPS
samesite="strict", # Not sent with cross-origin requests
max_age=ACCESS_TOKEN_EXPIRE_MINUTES * 60,
path="/",
)
# Set refresh token cookie
response.set_cookie(
key="refresh_token",
value=refresh_token,
httponly=True,
secure=True,
samesite="strict",
max_age=REFRESH_TOKEN_EXPIRE_DAYS * 86400,
path="/api/v1/auth/refresh", # Only sent to refresh endpoint
)
return {"detail": "Login successful", "username": user.username}
# Example client-side usage with the requests library
import requests
BASE_URL = "http://localhost:8000/api/v1"
# Step 1: Login to get tokens
login_response = requests.post(
f"{BASE_URL}/auth/token",
data={ # Note: form data, not JSON
"username": "johndoe",
"password": "SecurePass123!",
},
)
tokens = login_response.json()
access_token = tokens["access_token"]
refresh_token = tokens["refresh_token"]
print(f"Access Token: {access_token[:20]}...")
print(f"Expires in: {tokens['expires_in']} seconds")
# Step 2: Access a protected endpoint
headers = {"Authorization": f"Bearer {access_token}"}
profile_response = requests.get(
f"{BASE_URL}/users/me",
headers=headers,
)
print(f"Profile: {profile_response.json()}")
# Step 3: Refresh the access token when it expires
refresh_response = requests.post(
f"{BASE_URL}/auth/refresh",
json={"refresh_token": refresh_token},
)
new_tokens = refresh_response.json()
print(f"New Access Token: {new_tokens['access_token'][:20]}...")
The real power of FastAPI’s dependency injection system shines when protecting endpoints. We create reusable dependencies that validate tokens and retrieve the current user, then inject them into any endpoint that requires authentication.
# security/dependencies.py
from fastapi import Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError
from sqlalchemy.ext.asyncio import AsyncSession
from database import get_db
from models.user import User
from security.jwt_handler import decode_token
from services.user_service import UserService
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/v1/auth/token")
async def get_current_user(
token: str = Depends(oauth2_scheme),
db: AsyncSession = Depends(get_db),
) -> User:
"""
Dependency that validates the JWT token and returns the current user.
This dependency:
1. Extracts the Bearer token from the Authorization header
2. Decodes and validates the JWT (checks signature, expiration)
3. Extracts the username from the 'sub' claim
4. Looks up the user in the database
5. Verifies the user account is active
6. Returns the User object
If any step fails, it raises a 401 Unauthorized error.
Usage:
@router.get("/protected")
async def protected_route(user: User = Depends(get_current_user)):
return {"message": f"Hello, {user.username}"}
"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
# Decode the JWT token
token_data = decode_token(token)
if token_data.username is None:
raise credentials_exception
# Ensure this is an access token, not a refresh token
if token_data.token_type != "access":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token type. Use an access token.",
headers={"WWW-Authenticate": "Bearer"},
)
except JWTError:
raise credentials_exception
# Look up the user in the database
user_service = UserService(db)
user = await user_service.get_by_username(token_data.username)
if user is None:
raise credentials_exception
return user
async def get_current_active_user(
current_user: User = Depends(get_current_user),
) -> User:
"""
Dependency that ensures the user is active.
Builds on get_current_user by adding an active check.
Use this for most protected endpoints.
"""
if not current_user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Account is deactivated",
)
return current_user
# routers/users.py
from fastapi import APIRouter, Depends
from models.user import User
from schemas.user import UserResponse, UserUpdate
from security.dependencies import get_current_active_user
router = APIRouter(prefix="/api/v1/users", tags=["Users"])
@router.get("/me", response_model=UserResponse, summary="Get current user profile")
async def get_my_profile(
current_user: User = Depends(get_current_active_user),
):
"""
Return the profile of the currently authenticated user.
This endpoint requires a valid Bearer token in the Authorization header.
The token is automatically validated by the get_current_active_user
dependency, which also retrieves the full user object from the database.
"""
return current_user
@router.put("/me", response_model=UserResponse, summary="Update current user profile")
async def update_my_profile(
updates: UserUpdate,
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db),
):
"""Update the authenticated user's profile."""
if updates.full_name is not None:
current_user.full_name = updates.full_name
if updates.email is not None:
# Check if new email is already taken
existing = await UserService(db).get_by_email(updates.email)
if existing and existing.id != current_user.id:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail="Email already in use",
)
current_user.email = updates.email
await db.flush()
await db.refresh(current_user)
return current_user
@router.delete("/me", summary="Delete current user account")
async def delete_my_account(
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db),
):
"""Soft-delete the authenticated user's account."""
current_user.is_active = False
await db.flush()
return {"detail": "Account deactivated successfully"}
FastAPI’s dependency injection creates a clean chain of responsibilities. Understanding this chain helps you design your auth system:
# The dependency chain for a protected endpoint:
#
# HTTP Request
# |
# v
# OAuth2PasswordBearer --> Extracts "Bearer <token>" from header
# |
# v
# get_current_user --> Decodes JWT, looks up user in DB
# |
# v
# get_current_active_user --> Checks user.is_active
# |
# v
# require_role("admin") --> Checks user.role (RBAC - next section)
# |
# v
# Your endpoint handler --> Receives the validated User object
# Each dependency in the chain can:
# 1. Raise HTTPException to short-circuit the request
# 2. Pass data to the next dependency via return values
# 3. Access other dependencies (like the database session)
# Example: Stacking dependencies
@router.get("/admin/dashboard")
async def admin_dashboard(
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db),
):
"""
Both dependencies are resolved:
- get_current_active_user validates the token AND checks is_active
- get_db provides a database session
FastAPI resolves the entire dependency tree automatically.
"""
pass
# Sometimes you want endpoints that work for both anonymous and
# authenticated users (e.g., a public profile that shows extra
# data to the profile owner).
from fastapi.security import OAuth2PasswordBearer
# auto_error=False makes the dependency return None instead of
# raising 401 when no token is provided
oauth2_scheme_optional = OAuth2PasswordBearer(
tokenUrl="/api/v1/auth/token",
auto_error=False,
)
async def get_current_user_optional(
token: str | None = Depends(oauth2_scheme_optional),
db: AsyncSession = Depends(get_db),
) -> User | None:
"""
Optional authentication dependency.
Returns the User if a valid token is provided, None otherwise.
Does NOT raise 401 for missing or invalid tokens.
"""
if token is None:
return None
try:
token_data = decode_token(token)
user_service = UserService(db)
return await user_service.get_by_username(token_data.username)
except Exception:
return None
# Usage
@router.get("/posts/{post_id}")
async def get_post(
post_id: int,
current_user: User | None = Depends(get_current_user_optional),
):
"""
Public endpoint that shows extra data for authenticated users.
"""
post = await get_post_by_id(post_id)
response = {"title": post.title, "content": post.content}
if current_user and current_user.id == post.author_id:
response["edit_url"] = f"/posts/{post_id}/edit"
response["analytics"] = await get_post_analytics(post_id)
return response
Role-Based Access Control restricts system access based on the roles assigned to users. Instead of checking individual permissions for each user, you assign users to roles, and roles have predefined sets of permissions. This simplifies permission management significantly.
# security/rbac.py
import enum
from typing import Optional
from fastapi import Depends, HTTPException, status
from models.user import User, UserRole
from security.dependencies import get_current_active_user
class Permission(str, enum.Enum):
"""Fine-grained permissions for the application."""
# User permissions
READ_OWN_PROFILE = "read:own_profile"
UPDATE_OWN_PROFILE = "update:own_profile"
DELETE_OWN_ACCOUNT = "delete:own_account"
# Post permissions
CREATE_POST = "create:post"
READ_POST = "read:post"
UPDATE_OWN_POST = "update:own_post"
DELETE_OWN_POST = "delete:own_post"
# Admin permissions
READ_ALL_USERS = "read:all_users"
UPDATE_ANY_USER = "update:any_user"
DELETE_ANY_USER = "delete:any_user"
UPDATE_ANY_POST = "update:any_post"
DELETE_ANY_POST = "delete:any_post"
MANAGE_ROLES = "manage:roles"
VIEW_AUDIT_LOG = "view:audit_log"
# Super admin
MANAGE_SYSTEM = "manage:system"
# Map roles to their permissions
ROLE_PERMISSIONS: dict[UserRole, set[Permission]] = {
UserRole.USER: {
Permission.READ_OWN_PROFILE,
Permission.UPDATE_OWN_PROFILE,
Permission.DELETE_OWN_ACCOUNT,
Permission.CREATE_POST,
Permission.READ_POST,
Permission.UPDATE_OWN_POST,
Permission.DELETE_OWN_POST,
},
UserRole.MODERATOR: {
# Inherits all USER permissions plus:
Permission.READ_OWN_PROFILE,
Permission.UPDATE_OWN_PROFILE,
Permission.DELETE_OWN_ACCOUNT,
Permission.CREATE_POST,
Permission.READ_POST,
Permission.UPDATE_OWN_POST,
Permission.DELETE_OWN_POST,
# Moderator-specific
Permission.UPDATE_ANY_POST,
Permission.DELETE_ANY_POST,
Permission.READ_ALL_USERS,
},
UserRole.ADMIN: {
# Inherits all MODERATOR permissions plus:
Permission.READ_OWN_PROFILE,
Permission.UPDATE_OWN_PROFILE,
Permission.DELETE_OWN_ACCOUNT,
Permission.CREATE_POST,
Permission.READ_POST,
Permission.UPDATE_OWN_POST,
Permission.DELETE_OWN_POST,
Permission.UPDATE_ANY_POST,
Permission.DELETE_ANY_POST,
Permission.READ_ALL_USERS,
# Admin-specific
Permission.UPDATE_ANY_USER,
Permission.DELETE_ANY_USER,
Permission.MANAGE_ROLES,
Permission.VIEW_AUDIT_LOG,
},
UserRole.SUPER_ADMIN: {
perm for perm in Permission # All permissions
},
}
# security/rbac.py (continued)
def require_role(*allowed_roles: UserRole):
"""
Dependency factory that restricts access to users with specific roles.
Usage:
@router.get("/admin/users")
async def list_users(
user: User = Depends(require_role(UserRole.ADMIN, UserRole.SUPER_ADMIN))
):
...
"""
async def role_checker(
current_user: User = Depends(get_current_active_user),
) -> User:
if current_user.role not in allowed_roles:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Insufficient permissions. Required role: "
f"{', '.join(r.value for r in allowed_roles)}",
)
return current_user
return role_checker
def require_permission(*required_permissions: Permission):
"""
Dependency factory that checks for specific permissions.
More granular than role checking — checks if the user's role
grants the required permissions.
Usage:
@router.delete("/posts/{post_id}")
async def delete_post(
post_id: int,
user: User = Depends(require_permission(Permission.DELETE_ANY_POST))
):
...
"""
async def permission_checker(
current_user: User = Depends(get_current_active_user),
) -> User:
user_permissions = ROLE_PERMISSIONS.get(current_user.role, set())
missing = set(required_permissions) - user_permissions
if missing:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Missing permissions: "
f"{', '.join(p.value for p in missing)}",
)
return current_user
return permission_checker
# routers/admin.py
from fastapi import APIRouter, Depends
from sqlalchemy import select, func
from sqlalchemy.ext.asyncio import AsyncSession
from database import get_db
from models.user import User, UserRole
from schemas.user import UserResponse
from security.rbac import require_role, require_permission, Permission
router = APIRouter(prefix="/api/v1/admin", tags=["Admin"])
# Role-based access: only admins and super admins
@router.get(
"/users",
response_model=list[UserResponse],
summary="List all users (admin only)",
)
async def list_all_users(
skip: int = 0,
limit: int = 100,
current_user: User = Depends(
require_role(UserRole.ADMIN, UserRole.SUPER_ADMIN)
),
db: AsyncSession = Depends(get_db),
):
"""List all users. Requires ADMIN or SUPER_ADMIN role."""
result = await db.execute(
select(User).offset(skip).limit(limit)
)
return result.scalars().all()
# Permission-based access: more granular
@router.put(
"/users/{user_id}/role",
response_model=UserResponse,
summary="Change a user's role",
)
async def change_user_role(
user_id: int,
new_role: UserRole,
current_user: User = Depends(
require_permission(Permission.MANAGE_ROLES)
),
db: AsyncSession = Depends(get_db),
):
"""
Change a user's role. Requires MANAGE_ROLES permission.
Security rules:
- Cannot change your own role
- Cannot assign SUPER_ADMIN role (only direct DB change)
- Cannot change the role of a SUPER_ADMIN
"""
if user_id == current_user.id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Cannot change your own role",
)
if new_role == UserRole.SUPER_ADMIN:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="SUPER_ADMIN role can only be assigned via database",
)
user = await db.get(User, user_id)
if not user:
raise HTTPException(status_code=404, detail="User not found")
if user.role == UserRole.SUPER_ADMIN:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Cannot modify a SUPER_ADMIN user",
)
user.role = new_role
await db.flush()
await db.refresh(user)
return user
# Dashboard with statistics
@router.get("/dashboard", summary="Admin dashboard statistics")
async def admin_dashboard(
current_user: User = Depends(
require_role(UserRole.ADMIN, UserRole.SUPER_ADMIN)
),
db: AsyncSession = Depends(get_db),
):
"""Get system statistics for the admin dashboard."""
total_users = await db.scalar(select(func.count(User.id)))
active_users = await db.scalar(
select(func.count(User.id)).where(User.is_active == True)
)
role_counts = {}
for role in UserRole:
count = await db.scalar(
select(func.count(User.id)).where(User.role == role)
)
role_counts[role.value] = count
return {
"total_users": total_users,
"active_users": active_users,
"inactive_users": total_users - active_users,
"users_by_role": role_counts,
}
# A cleaner approach using role hierarchy
# Higher roles automatically inherit all lower role permissions
ROLE_HIERARCHY: dict[UserRole, int] = {
UserRole.USER: 1,
UserRole.MODERATOR: 2,
UserRole.ADMIN: 3,
UserRole.SUPER_ADMIN: 4,
}
def require_minimum_role(minimum_role: UserRole):
"""
Require a minimum role level using hierarchy.
A user with ADMIN role automatically passes a check for
MODERATOR or USER level, because ADMIN is higher in the hierarchy.
Usage:
@router.get("/mod/reports")
async def view_reports(
user: User = Depends(require_minimum_role(UserRole.MODERATOR))
):
... # Accessible by MODERATOR, ADMIN, and SUPER_ADMIN
"""
required_level = ROLE_HIERARCHY[minimum_role]
async def check_role(
current_user: User = Depends(get_current_active_user),
) -> User:
user_level = ROLE_HIERARCHY.get(current_user.role, 0)
if user_level < required_level:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Minimum role required: {minimum_role.value}",
)
return current_user
return check_role
While JWT tokens are ideal for user-facing authentication, API keys are better suited for server-to-server communication, third-party integrations, and programmatic access. FastAPI provides built-in support for API key authentication via headers, query parameters, or cookies.
# security/api_key.py
import secrets
from datetime import datetime, timezone
from typing import Optional
from fastapi import Depends, HTTPException, Security, status
from fastapi.security import APIKeyHeader
from sqlalchemy.ext.asyncio import AsyncSession
from database import get_db
from models.user import User
from services.user_service import UserService
# Define where to look for the API key
api_key_header = APIKeyHeader(
name="X-API-Key",
auto_error=False, # Return None instead of 403 if missing
)
def generate_api_key() -> str:
"""
Generate a cryptographically secure API key.
Format: prefix_randomhex
The prefix makes it easy to identify and rotate keys.
"""
return f"lmsc_{secrets.token_hex(32)}"
async def get_user_from_api_key(
api_key: Optional[str] = Security(api_key_header),
db: AsyncSession = Depends(get_db),
) -> Optional[User]:
"""
Validate an API key and return the associated user.
Returns None if no API key is provided (allowing fallback
to other auth methods).
"""
if api_key is None:
return None
user_service = UserService(db)
user = await user_service.get_by_api_key(api_key)
if user is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid API key",
)
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="API key owner account is deactivated",
)
return user
# security/combined_auth.py
from fastapi import Depends, HTTPException, status
from models.user import User
from security.dependencies import get_current_user_optional
from security.api_key import get_user_from_api_key
async def get_current_user_flexible(
jwt_user: User | None = Depends(get_current_user_optional),
api_key_user: User | None = Depends(get_user_from_api_key),
) -> User:
"""
Accept either JWT token OR API key authentication.
This allows endpoints to work for both:
- Browser users (JWT from login)
- API clients (API key in header)
JWT takes priority if both are provided.
"""
user = jwt_user or api_key_user
if user is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Authentication required. Provide a Bearer token or X-API-Key.",
headers={"WWW-Authenticate": "Bearer"},
)
return user
# Usage
@router.get("/data")
async def get_data(
current_user: User = Depends(get_current_user_flexible),
):
"""This endpoint accepts both JWT and API key auth."""
return {"user": current_user.username, "data": "..."}
# routers/api_keys.py
from fastapi import APIRouter, Depends, HTTPException, status
from sqlalchemy.ext.asyncio import AsyncSession
from database import get_db
from models.user import User
from security.api_key import generate_api_key
from security.dependencies import get_current_active_user
router = APIRouter(prefix="/api/v1/api-keys", tags=["API Keys"])
@router.post("/generate", summary="Generate a new API key")
async def generate_new_api_key(
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db),
):
"""
Generate a new API key for the authenticated user.
WARNING: The API key is only shown once. Store it securely.
Generating a new key invalidates the previous one.
"""
new_key = generate_api_key()
current_user.api_key = new_key
await db.flush()
return {
"api_key": new_key,
"message": "Store this key securely. It will not be shown again.",
}
@router.delete("/revoke", summary="Revoke current API key")
async def revoke_api_key(
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db),
):
"""Revoke the current user's API key."""
if current_user.api_key is None:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="No API key to revoke",
)
current_user.api_key = None
await db.flush()
return {"detail": "API key revoked successfully"}
# security/rate_limit.py
import time
from collections import defaultdict
from typing import Optional
from fastapi import Depends, HTTPException, Request, status
class RateLimiter:
"""
In-memory rate limiter using the sliding window algorithm.
For production, use Redis-based rate limiting (e.g., with
the slowapi library or a custom Redis implementation).
"""
def __init__(self):
# {key: [(timestamp, count), ...]}
self.requests: dict[str, list[float]] = defaultdict(list)
def is_rate_limited(
self,
key: str,
max_requests: int,
window_seconds: int,
) -> tuple[bool, dict]:
"""
Check if a key has exceeded its rate limit.
Returns:
(is_limited, info_dict) where info_dict contains
remaining requests and reset time.
"""
now = time.time()
window_start = now - window_seconds
# Remove expired entries
self.requests[key] = [
ts for ts in self.requests[key]
if ts > window_start
]
current_count = len(self.requests[key])
if current_count >= max_requests:
reset_time = self.requests[key][0] + window_seconds
return True, {
"limit": max_requests,
"remaining": 0,
"reset": int(reset_time),
}
# Record this request
self.requests[key].append(now)
return False, {
"limit": max_requests,
"remaining": max_requests - current_count - 1,
"reset": int(now + window_seconds),
}
# Global rate limiter instance
rate_limiter = RateLimiter()
def rate_limit(max_requests: int = 60, window_seconds: int = 60):
"""
Rate limiting dependency factory.
Usage:
@router.get("/data", dependencies=[Depends(rate_limit(100, 60))])
async def get_data():
...
"""
async def check_rate_limit(request: Request):
# Use API key or IP address as the rate limit key
api_key = request.headers.get("X-API-Key")
client_key = api_key or request.client.host
is_limited, info = rate_limiter.is_rate_limited(
client_key, max_requests, window_seconds
)
if is_limited:
raise HTTPException(
status_code=status.HTTP_429_TOO_MANY_REQUESTS,
detail="Rate limit exceeded",
headers={
"X-RateLimit-Limit": str(info["limit"]),
"X-RateLimit-Remaining": str(info["remaining"]),
"X-RateLimit-Reset": str(info["reset"]),
"Retry-After": str(info["reset"] - int(time.time())),
},
)
return check_rate_limit
# Usage example
@router.get(
"/api/v1/search",
dependencies=[Depends(rate_limit(max_requests=30, window_seconds=60))],
)
async def search(q: str):
"""Rate-limited search endpoint: 30 requests per minute."""
return {"query": q, "results": []}
OAuth2 scopes provide a fine-grained permission system at the token level. Unlike role-based access which checks the user’s role, scopes define what a specific token is authorized to do. This is particularly useful when users want to grant limited access to third-party applications.
# security/scopes.py
from fastapi import Depends, HTTPException, Security, status
from fastapi.security import OAuth2PasswordBearer, SecurityScopes
from jose import JWTError
from security.jwt_handler import decode_token
from models.user import User
from services.user_service import UserService
from database import get_db
from sqlalchemy.ext.asyncio import AsyncSession
# Define available scopes with descriptions
# These appear in Swagger UI's Authorize dialog
OAUTH2_SCOPES = {
"profile:read": "Read your profile information",
"profile:write": "Update your profile information",
"posts:read": "Read posts",
"posts:write": "Create and edit posts",
"posts:delete": "Delete posts",
"users:read": "Read user information (admin)",
"users:write": "Modify user accounts (admin)",
"admin": "Full administrative access",
}
# OAuth2 scheme with scopes
oauth2_scheme_scoped = OAuth2PasswordBearer(
tokenUrl="/api/v1/auth/token",
scopes=OAUTH2_SCOPES,
)
# security/scopes.py (continued)
async def get_current_user_with_scopes(
security_scopes: SecurityScopes,
token: str = Depends(oauth2_scheme_scoped),
db: AsyncSession = Depends(get_db),
) -> User:
"""
Validate token AND check that it has the required scopes.
FastAPI's SecurityScopes automatically collects the scopes
required by the endpoint and all its dependencies.
Args:
security_scopes: Automatically populated by FastAPI with
the scopes required by the endpoint chain.
token: The JWT bearer token.
db: Database session.
Returns:
The authenticated User if token is valid and has required scopes.
"""
# Build the authenticate header value with required scopes
if security_scopes.scopes:
authenticate_value = f'Bearer scope="{security_scopes.scope_str}"'
else:
authenticate_value = "Bearer"
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": authenticate_value},
)
try:
token_data = decode_token(token)
if token_data.username is None:
raise credentials_exception
except JWTError:
raise credentials_exception
# Look up the user
user_service = UserService(db)
user = await user_service.get_by_username(token_data.username)
if user is None:
raise credentials_exception
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="Account is deactivated",
)
# Check that the token has all required scopes
for scope in security_scopes.scopes:
if scope not in token_data.scopes:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail=f"Token missing required scope: {scope}",
headers={"WWW-Authenticate": authenticate_value},
)
return user
# routers/posts.py
from fastapi import APIRouter, Depends, Security
from models.user import User
from security.scopes import get_current_user_with_scopes
router = APIRouter(prefix="/api/v1/posts", tags=["Posts"])
@router.get("/")
async def list_posts(
current_user: User = Security(
get_current_user_with_scopes,
scopes=["posts:read"],
),
):
"""
List all posts. Requires 'posts:read' scope.
Note: We use Security() instead of Depends() when working with
scopes. Security() is a subclass of Depends() that also passes
the required scopes to the dependency.
"""
return {"posts": [], "user": current_user.username}
@router.post("/")
async def create_post(
title: str,
content: str,
current_user: User = Security(
get_current_user_with_scopes,
scopes=["posts:read", "posts:write"],
),
):
"""Create a post. Requires both 'posts:read' and 'posts:write' scopes."""
return {
"title": title,
"content": content,
"author": current_user.username,
}
@router.delete("/{post_id}")
async def delete_post(
post_id: int,
current_user: User = Security(
get_current_user_with_scopes,
scopes=["posts:delete"],
),
):
"""Delete a post. Requires 'posts:delete' scope."""
return {"detail": f"Post {post_id} deleted"}
# Admin endpoint requiring admin scope
@router.get("/admin/all")
async def admin_list_all_posts(
current_user: User = Security(
get_current_user_with_scopes,
scopes=["admin"],
),
):
"""List all posts with admin details. Requires 'admin' scope."""
return {"posts": [], "total": 0, "admin_view": True}
# Updated login endpoint that respects requested scopes
@router.post("/token")
async def login_with_scopes(
form_data: OAuth2PasswordRequestForm = Depends(),
db: AsyncSession = Depends(get_db),
):
"""
Login endpoint that issues tokens with requested scopes.
The client can request specific scopes during login.
The server validates that the user is allowed to have
those scopes based on their role.
"""
user_service = UserService(db)
user = await user_service.authenticate(
form_data.username, form_data.password
)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
)
# Determine allowed scopes based on user role
allowed_scopes = get_allowed_scopes_for_role(user.role)
# Filter requested scopes to only include allowed ones
requested_scopes = form_data.scopes # list of scope strings
granted_scopes = [s for s in requested_scopes if s in allowed_scopes]
# If no scopes requested, grant default scopes for the role
if not requested_scopes:
granted_scopes = list(allowed_scopes)
token_data = {
"sub": user.username,
"scopes": granted_scopes,
"role": user.role.value,
}
access_token = create_access_token(data=token_data)
refresh_token = create_refresh_token(data={"sub": user.username})
return {
"access_token": access_token,
"refresh_token": refresh_token,
"token_type": "bearer",
"scope": " ".join(granted_scopes), # OAuth2 spec: space-separated
}
def get_allowed_scopes_for_role(role: UserRole) -> set[str]:
"""Map user roles to allowed OAuth2 scopes."""
base_scopes = {"profile:read", "profile:write", "posts:read"}
role_scope_map = {
UserRole.USER: base_scopes | {"posts:write"},
UserRole.MODERATOR: base_scopes | {
"posts:write", "posts:delete", "users:read"
},
UserRole.ADMIN: base_scopes | {
"posts:write", "posts:delete",
"users:read", "users:write", "admin"
},
UserRole.SUPER_ADMIN: set(OAUTH2_SCOPES.keys()), # All scopes
}
return role_scope_map.get(role, base_scopes)
Refresh token rotation is a security technique where a new refresh token is issued every time the old one is used. This limits the window of vulnerability if a refresh token is compromised, because the stolen token becomes invalid after its first use.
# routers/auth.py — refresh endpoint with rotation
from datetime import datetime, timedelta, timezone
from pydantic import BaseModel
from sqlalchemy import select
from models.user import RefreshToken
class RefreshRequest(BaseModel):
refresh_token: str
@router.post(
"/refresh",
response_model=TokenResponse,
summary="Refresh access token with rotation",
)
async def refresh_access_token(
request: RefreshRequest,
db: AsyncSession = Depends(get_db),
):
"""
Exchange a refresh token for a new access token + refresh token.
Implements refresh token rotation:
1. Validates the refresh token (signature, expiration, type)
2. Checks if the token has been revoked
3. Detects token reuse (potential theft)
4. Issues new access + refresh tokens
5. Revokes the old refresh token
If token reuse is detected, ALL refresh tokens for the user
are revoked as a security measure.
"""
# Step 1: Decode and validate the refresh token
try:
token_data = decode_token(request.refresh_token)
except JWTError as e:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail=f"Invalid refresh token: {str(e)}",
)
# Step 2: Ensure this is actually a refresh token
if token_data.token_type != "refresh":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token type. Expected refresh token.",
)
# Step 3: Look up the refresh token in the database
result = await db.execute(
select(RefreshToken).where(
RefreshToken.token_jti == token_data.jti
)
)
stored_token = result.scalar_one_or_none()
if stored_token is None:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Refresh token not found",
)
# Step 4: CRITICAL — Check for token reuse (theft detection)
if stored_token.is_revoked:
# This token was already used! Someone may have stolen it.
# Revoke ALL refresh tokens for this user as a safety measure.
await _revoke_all_user_tokens(stored_token.user_id, db)
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token reuse detected. All sessions have been revoked. "
"Please log in again.",
)
# Step 5: Revoke the current refresh token (it is now used)
stored_token.is_revoked = True
# Step 6: Look up the user
user_service = UserService(db)
user = await user_service.get_by_id(stored_token.user_id)
if not user or not user.is_active:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User account not found or deactivated",
)
# Step 7: Issue new tokens
new_token_data = {
"sub": user.username,
"role": user.role.value,
}
new_access_token = create_access_token(data=new_token_data)
new_refresh_token = create_refresh_token(data={"sub": user.username})
# Step 8: Store the new refresh token
new_refresh_data = decode_token(new_refresh_token)
db_new_token = RefreshToken(
token_jti=new_refresh_data.jti,
user_id=user.id,
expires_at=datetime.now(timezone.utc) + timedelta(
days=REFRESH_TOKEN_EXPIRE_DAYS
),
)
db.add(db_new_token)
return TokenResponse(
access_token=new_access_token,
refresh_token=new_refresh_token,
token_type="bearer",
expires_in=ACCESS_TOKEN_EXPIRE_MINUTES * 60,
)
async def _revoke_all_user_tokens(user_id: int, db: AsyncSession):
"""
Revoke all refresh tokens for a user.
Called when token reuse is detected as a security measure.
This forces the user (and any attacker) to log in again.
"""
from sqlalchemy import update
await db.execute(
update(RefreshToken)
.where(
RefreshToken.user_id == user_id,
RefreshToken.is_revoked == False,
)
.values(is_revoked=True)
)
await db.flush()
# services/token_cleanup.py
from datetime import datetime, timezone
from sqlalchemy import delete, select
from sqlalchemy.ext.asyncio import AsyncSession
from models.user import RefreshToken
async def cleanup_expired_tokens(db: AsyncSession) -> int:
"""
Remove expired and revoked refresh tokens from the database.
Run this periodically (e.g., daily via a scheduled task)
to keep the refresh_tokens table from growing indefinitely.
Returns:
Number of tokens removed.
"""
now = datetime.now(timezone.utc)
result = await db.execute(
delete(RefreshToken).where(
(RefreshToken.expires_at < now) |
(RefreshToken.is_revoked == True)
)
)
await db.commit()
return result.rowcount
# Schedule cleanup on application startup
from contextlib import asynccontextmanager
from fastapi import FastAPI
import asyncio
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application lifespan manager with periodic token cleanup."""
# Start background cleanup task
cleanup_task = asyncio.create_task(periodic_cleanup())
yield # Application runs here
# Shutdown: cancel the cleanup task
cleanup_task.cancel()
try:
await cleanup_task
except asyncio.CancelledError:
pass
async def periodic_cleanup():
"""Run token cleanup every 24 hours."""
while True:
try:
async with async_session() as db:
removed = await cleanup_expired_tokens(db)
print(f"Token cleanup: removed {removed} expired/revoked tokens")
except Exception as e:
print(f"Token cleanup error: {e}")
await asyncio.sleep(86400) # 24 hours
# Use the lifespan in your app
app = FastAPI(lifespan=lifespan)
# security/token_blacklist.py
"""
Token blacklisting for immediate access token revocation.
While refresh token rotation handles refresh tokens, sometimes
you need to immediately revoke an access token (e.g., when a user
changes their password or an admin disables an account).
For production, use Redis for O(1) lookups and automatic expiry:
pip install redis
"""
from datetime import datetime, timezone
from typing import Optional
import redis.asyncio as redis
# Redis connection for token blacklist
redis_client = redis.Redis(host="localhost", port=6379, db=0)
async def blacklist_token(jti: str, expires_at: datetime) -> None:
"""
Add a token's JTI to the blacklist.
The entry automatically expires when the token would have expired,
so the blacklist stays clean without manual cleanup.
"""
ttl = int((expires_at - datetime.now(timezone.utc)).total_seconds())
if ttl > 0:
await redis_client.setex(f"blacklist:{jti}", ttl, "revoked")
async def is_token_blacklisted(jti: str) -> bool:
"""Check if a token has been blacklisted."""
result = await redis_client.get(f"blacklist:{jti}")
return result is not None
# Updated get_current_user with blacklist check
async def get_current_user_with_blacklist(
token: str = Depends(oauth2_scheme),
db: AsyncSession = Depends(get_db),
) -> User:
"""Validates token and checks blacklist."""
try:
token_data = decode_token(token)
except JWTError:
raise HTTPException(status_code=401, detail="Invalid token")
# Check blacklist
if token_data.jti and await is_token_blacklisted(token_data.jti):
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token has been revoked",
)
user_service = UserService(db)
user = await user_service.get_by_username(token_data.username)
if not user or not user.is_active:
raise HTTPException(status_code=401, detail="User not found")
return user
A secure authentication system requires more than just correct logic. You need to configure transport security, prevent cross-origin attacks, rate-limit sensitive endpoints, sanitize input, and set appropriate security headers. This section covers the essential security hardening steps for a production FastAPI application.
# middleware/https_redirect.py
from fastapi import FastAPI, Request
from fastapi.middleware.httpsredirect import HTTPSRedirectMiddleware
from starlette.middleware.base import BaseHTTPMiddleware
def configure_https(app: FastAPI, environment: str = "production"):
"""
Configure HTTPS enforcement.
In production, all HTTP requests are redirected to HTTPS.
In development, HTTPS is not enforced.
"""
if environment == "production":
# Redirect all HTTP requests to HTTPS
app.add_middleware(HTTPSRedirectMiddleware)
class HSTSMiddleware(BaseHTTPMiddleware):
"""
Add HTTP Strict Transport Security header.
Tells browsers to only access the site over HTTPS for the
specified duration, preventing protocol downgrade attacks.
"""
async def dispatch(self, request: Request, call_next):
response = await call_next(request)
response.headers["Strict-Transport-Security"] = (
"max-age=31536000; includeSubDomains; preload"
)
return response
# middleware/cors.py
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
def configure_cors(app: FastAPI):
"""
Configure Cross-Origin Resource Sharing.
CORS controls which origins (domains) can make requests to your API.
This is crucial for security when your frontend and backend are on
different domains.
"""
# Allowed origins — be specific, never use ["*"] in production
allowed_origins = [
"https://yourdomain.com",
"https://app.yourdomain.com",
"http://localhost:3000", # React development server
"http://localhost:5173", # Vite development server
]
app.add_middleware(
CORSMiddleware,
allow_origins=allowed_origins,
allow_credentials=True, # Allow cookies (for HTTP-only token cookies)
allow_methods=["GET", "POST", "PUT", "DELETE", "PATCH"],
allow_headers=[
"Authorization",
"Content-Type",
"X-API-Key",
"X-Request-ID",
],
expose_headers=[
"X-RateLimit-Limit",
"X-RateLimit-Remaining",
"X-RateLimit-Reset",
],
max_age=600, # Cache preflight requests for 10 minutes
)
allow_origins=["*"] with allow_credentials=True. This combination is explicitly forbidden by the CORS specification. If you need credentials support, you must list specific origins.
# middleware/security_headers.py
from starlette.middleware.base import BaseHTTPMiddleware
from fastapi import Request
class SecurityHeadersMiddleware(BaseHTTPMiddleware):
"""
Add security headers to all responses.
These headers protect against common web vulnerabilities:
- XSS (Cross-Site Scripting)
- Clickjacking
- MIME-type sniffing
- Information leakage
"""
async def dispatch(self, request: Request, call_next):
response = await call_next(request)
# Prevent XSS: Don't execute scripts from inline sources
response.headers["Content-Security-Policy"] = (
"default-src 'self'; "
"script-src 'self'; "
"style-src 'self' 'unsafe-inline'; "
"img-src 'self' data:; "
"font-src 'self'; "
"connect-src 'self'"
)
# Prevent clickjacking: Don't allow embedding in iframes
response.headers["X-Frame-Options"] = "DENY"
# Prevent MIME-type sniffing
response.headers["X-Content-Type-Options"] = "nosniff"
# Control referrer information
response.headers["Referrer-Policy"] = "strict-origin-when-cross-origin"
# Enable browser XSS filter (legacy, but still useful)
response.headers["X-XSS-Protection"] = "1; mode=block"
# Control browser features
response.headers["Permissions-Policy"] = (
"camera=(), microphone=(), geolocation=(), "
"payment=(), usb=()"
)
# Don't leak server information
response.headers.pop("server", None)
return response
# security/sanitization.py
import re
import html
from typing import Any
from pydantic import field_validator
def sanitize_string(value: str) -> str:
"""
Sanitize a string input to prevent injection attacks.
This function:
1. Strips leading/trailing whitespace
2. Escapes HTML entities (prevents XSS)
3. Removes null bytes (prevents null byte injection)
4. Limits length to prevent DoS
"""
if not isinstance(value, str):
return value
# Strip whitespace
value = value.strip()
# Remove null bytes
value = value.replace("\x00", "")
# Escape HTML entities
value = html.escape(value, quote=True)
return value
def sanitize_search_query(query: str) -> str:
"""
Sanitize search queries to prevent SQL injection and XSS.
For SQL queries, always use parameterized queries (SQLAlchemy
does this automatically). This function handles the display layer.
"""
# Remove SQL special characters
query = re.sub(r"[;'\"\-\-\/\*]", "", query)
# Escape HTML
query = html.escape(query)
# Limit length
return query[:500]
# Pydantic model with built-in sanitization
from pydantic import BaseModel
class SanitizedInput(BaseModel):
"""Base model that automatically sanitizes string fields."""
@field_validator("*", mode="before")
@classmethod
def sanitize_strings(cls, v: Any) -> Any:
if isinstance(v, str):
return sanitize_string(v)
return v
# Usage: inherit from SanitizedInput instead of BaseModel
class CommentCreate(SanitizedInput):
content: str
post_id: int
# content will be automatically sanitized
pip install slowapi
# middleware/rate_limiting.py
from slowapi import Limiter, _rate_limit_exceeded_handler
from slowapi.util import get_remote_address
from slowapi.errors import RateLimitExceeded
from fastapi import FastAPI, Request
def get_rate_limit_key(request: Request) -> str:
"""
Determine the rate limit key for a request.
Uses the API key if present, otherwise falls back to IP address.
This ensures API key users get their own rate limit bucket.
"""
api_key = request.headers.get("X-API-Key")
if api_key:
return f"apikey:{api_key}"
return get_remote_address(request)
# Create limiter instance
limiter = Limiter(key_func=get_rate_limit_key)
def configure_rate_limiting(app: FastAPI):
"""Configure rate limiting for the application."""
app.state.limiter = limiter
app.add_exception_handler(RateLimitExceeded, _rate_limit_exceeded_handler)
# Usage in endpoints
from slowapi import limiter as _limiter
@router.post("/auth/token")
@limiter.limit("5/minute") # Strict limit on login attempts
async def login(request: Request, form_data: OAuth2PasswordRequestForm = Depends()):
"""Login with rate limiting: 5 attempts per minute."""
...
@router.post("/users/register")
@limiter.limit("3/hour") # Very strict for registration
async def register(request: Request, user_data: UserCreate):
"""Registration with rate limiting: 3 per hour per IP."""
...
@router.get("/api/data")
@limiter.limit("100/minute") # Higher limit for data endpoints
async def get_data(request: Request):
"""Data endpoint: 100 requests per minute."""
...
# security/audit.py
import logging
from datetime import datetime, timezone
from enum import Enum
from typing import Optional
from fastapi import Request
class AuditEvent(str, Enum):
"""Types of security events to audit."""
LOGIN_SUCCESS = "login_success"
LOGIN_FAILURE = "login_failure"
LOGOUT = "logout"
TOKEN_REFRESH = "token_refresh"
TOKEN_REVOKED = "token_revoked"
TOKEN_REUSE_DETECTED = "token_reuse_detected"
PASSWORD_CHANGED = "password_changed"
ROLE_CHANGED = "role_changed"
ACCOUNT_LOCKED = "account_locked"
ACCOUNT_DEACTIVATED = "account_deactivated"
API_KEY_GENERATED = "api_key_generated"
API_KEY_REVOKED = "api_key_revoked"
UNAUTHORIZED_ACCESS = "unauthorized_access"
RATE_LIMIT_EXCEEDED = "rate_limit_exceeded"
# Configure audit logger
audit_logger = logging.getLogger("security.audit")
audit_logger.setLevel(logging.INFO)
# File handler for persistent audit log
handler = logging.FileHandler("security_audit.log")
handler.setFormatter(
logging.Formatter(
"%(asctime)s | %(levelname)s | %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
)
)
audit_logger.addHandler(handler)
def log_security_event(
event: AuditEvent,
request: Optional[Request] = None,
user_id: Optional[int] = None,
username: Optional[str] = None,
details: Optional[str] = None,
):
"""
Log a security event for auditing.
In production, consider sending these to a SIEM system
(Splunk, ELK, etc.) or a dedicated audit database.
"""
client_ip = request.client.host if request else "unknown"
user_agent = (
request.headers.get("user-agent", "unknown")
if request else "unknown"
)
log_entry = (
f"event={event.value} | "
f"ip={client_ip} | "
f"user_id={user_id} | "
f"username={username} | "
f"user_agent={user_agent} | "
f"details={details}"
)
# Use WARNING level for security-critical events
if event in (
AuditEvent.TOKEN_REUSE_DETECTED,
AuditEvent.ACCOUNT_LOCKED,
AuditEvent.UNAUTHORIZED_ACCESS,
AuditEvent.LOGIN_FAILURE,
):
audit_logger.warning(log_entry)
else:
audit_logger.info(log_entry)
# Usage in login endpoint
async def login(request: Request, ...):
user = await authenticate(username, password)
if not user:
log_security_event(
AuditEvent.LOGIN_FAILURE,
request=request,
username=username,
details="Invalid credentials",
)
raise HTTPException(...)
log_security_event(
AuditEvent.LOGIN_SUCCESS,
request=request,
user_id=user.id,
username=user.username,
)
...
| Category | Requirement | Status |
|---|---|---|
| Transport | HTTPS enforced with HSTS | Required |
| Transport | TLS 1.2+ only | Required |
| Passwords | bcrypt or argon2 hashing | Required |
| Passwords | Minimum 8 characters with complexity | Required |
| Tokens | Short-lived access tokens (15-30 min) | Required |
| Tokens | Refresh token rotation | Recommended |
| Tokens | Token blacklisting capability | Recommended |
| Headers | CORS properly configured | Required |
| Headers | Security headers set (CSP, X-Frame, etc.) | Required |
| Rate Limiting | Login endpoint rate limited | Required |
| Rate Limiting | Registration endpoint rate limited | Required |
| Input | All input validated and sanitized | Required |
| Input | Parameterized SQL queries | Required |
| Logging | Security events audited | Required |
| Logging | No sensitive data in logs | Required |
| Account | Account lockout after failed attempts | Recommended |
| Account | Email verification for new accounts | Recommended |
Now let us bring everything together into a complete, production-ready authentication system. This section combines all the concepts we have covered into a cohesive application structure that you can use as a template for your own projects.
fastapi-auth/
├── main.py # Application entry point
├── config.py # Configuration settings
├── database.py # Database connection and session
├── requirements.txt # Python dependencies
├── .env # Environment variables (not in git)
├── models/
│ ├── __init__.py
│ └── user.py # User and RefreshToken models
├── schemas/
│ ├── __init__.py
│ ├── user.py # User Pydantic schemas
│ └── auth.py # Auth request/response schemas
├── routers/
│ ├── __init__.py
│ ├── auth.py # Login, logout, refresh endpoints
│ ├── users.py # User registration and profile
│ └── admin.py # Admin-only endpoints
├── security/
│ ├── __init__.py
│ ├── password.py # Password hashing
│ ├── jwt_handler.py # JWT creation and validation
│ ├── dependencies.py # Auth dependencies (get_current_user)
│ ├── rbac.py # Role-based access control
│ ├── api_key.py # API key authentication
│ ├── scopes.py # OAuth2 scopes
│ └── audit.py # Security audit logging
├── services/
│ ├── __init__.py
│ └── user_service.py # User business logic
└── middleware/
├── __init__.py
├── cors.py # CORS configuration
├── security_headers.py # Security headers
└── rate_limiting.py # Rate limiting
# requirements.txt fastapi==0.115.0 uvicorn[standard]==0.30.0 sqlalchemy[asyncio]==2.0.35 aiosqlite==0.20.0 python-jose[cryptography]==3.3.0 passlib[bcrypt]==1.7.4 bcrypt==4.2.0 python-multipart==0.0.9 pydantic[email]==2.9.0 pydantic-settings==2.5.0 slowapi==0.1.9 python-dotenv==1.0.1
# .env SECRET_KEY=your-generated-secret-key-from-openssl-rand-hex-32 ALGORITHM=HS256 ACCESS_TOKEN_EXPIRE_MINUTES=30 REFRESH_TOKEN_EXPIRE_DAYS=7 DATABASE_URL=sqlite+aiosqlite:///./auth_app.db ENVIRONMENT=development BCRYPT_ROUNDS=12
# config.py
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
"""Application settings loaded from environment variables."""
# Application
APP_NAME: str = "FastAPI Auth System"
ENVIRONMENT: str = "development"
DEBUG: bool = False
# Database
DATABASE_URL: str = "sqlite+aiosqlite:///./auth_app.db"
# JWT
SECRET_KEY: str
ALGORITHM: str = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES: int = 30
REFRESH_TOKEN_EXPIRE_DAYS: int = 7
# Security
BCRYPT_ROUNDS: int = 12
MAX_LOGIN_ATTEMPTS: int = 5
LOCKOUT_DURATION_MINUTES: int = 30
# CORS
ALLOWED_ORIGINS: list[str] = ["http://localhost:3000"]
class Config:
env_file = ".env"
case_sensitive = True
settings = Settings()
# database.py
from sqlalchemy.ext.asyncio import (
AsyncSession,
create_async_engine,
async_sessionmaker,
)
from sqlalchemy.orm import DeclarativeBase
from config import settings
engine = create_async_engine(
settings.DATABASE_URL,
echo=settings.DEBUG,
pool_pre_ping=True, # Verify connections are alive
pool_size=5, # Connection pool size
max_overflow=10, # Extra connections allowed
)
async_session = async_sessionmaker(
engine,
class_=AsyncSession,
expire_on_commit=False,
)
class Base(DeclarativeBase):
pass
async def get_db():
"""Database session dependency with automatic cleanup."""
async with async_session() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
async def init_db():
"""Create all database tables."""
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
# schemas/auth.py
from pydantic import BaseModel
class TokenResponse(BaseModel):
"""Response schema for token endpoints."""
access_token: str
refresh_token: str
token_type: str = "bearer"
expires_in: int
scope: str = ""
class RefreshRequest(BaseModel):
"""Request schema for token refresh."""
refresh_token: str
class PasswordChangeRequest(BaseModel):
"""Request schema for password changes."""
current_password: str
new_password: str
class MessageResponse(BaseModel):
"""Generic message response."""
detail: str
# routers/auth.py
from datetime import datetime, timedelta, timezone
from fastapi import APIRouter, Depends, HTTPException, Request, status
from fastapi.security import OAuth2PasswordRequestForm
from jose import JWTError
from sqlalchemy import select, update
from sqlalchemy.ext.asyncio import AsyncSession
from config import settings
from database import get_db
from models.user import RefreshToken, User
from schemas.auth import (
MessageResponse,
PasswordChangeRequest,
RefreshRequest,
TokenResponse,
)
from security.audit import AuditEvent, log_security_event
from security.dependencies import get_current_active_user
from security.jwt_handler import (
create_access_token,
create_refresh_token,
decode_token,
)
from security.password import hash_password, verify_password
from services.user_service import UserService
router = APIRouter(prefix="/api/v1/auth", tags=["Authentication"])
@router.post("/token", response_model=TokenResponse)
async def login(
request: Request,
form_data: OAuth2PasswordRequestForm = Depends(),
db: AsyncSession = Depends(get_db),
):
"""
Authenticate user and return JWT access + refresh tokens.
Accepts OAuth2 password flow (form data with username and password).
"""
user_service = UserService(db)
user = await user_service.authenticate(
form_data.username, form_data.password
)
if not user:
log_security_event(
AuditEvent.LOGIN_FAILURE,
request=request,
username=form_data.username,
details="Invalid credentials",
)
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
headers={"WWW-Authenticate": "Bearer"},
)
# Determine scopes based on role
from security.scopes import get_allowed_scopes_for_role
granted_scopes = list(get_allowed_scopes_for_role(user.role))
# Create tokens
token_data = {
"sub": user.username,
"role": user.role.value,
"scopes": granted_scopes,
}
access_token = create_access_token(data=token_data)
refresh_token = create_refresh_token(data={"sub": user.username})
# Store refresh token
refresh_data = decode_token(refresh_token)
db_token = RefreshToken(
token_jti=refresh_data.jti,
user_id=user.id,
expires_at=datetime.now(timezone.utc) + timedelta(
days=settings.REFRESH_TOKEN_EXPIRE_DAYS
),
)
db.add(db_token)
log_security_event(
AuditEvent.LOGIN_SUCCESS,
request=request,
user_id=user.id,
username=user.username,
)
return TokenResponse(
access_token=access_token,
refresh_token=refresh_token,
expires_in=settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60,
scope=" ".join(granted_scopes),
)
@router.post("/refresh", response_model=TokenResponse)
async def refresh_token(
request: Request,
body: RefreshRequest,
db: AsyncSession = Depends(get_db),
):
"""Refresh access token using refresh token rotation."""
try:
token_data = decode_token(body.refresh_token)
except JWTError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid refresh token",
)
if token_data.token_type != "refresh":
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid token type",
)
# Find stored token
result = await db.execute(
select(RefreshToken).where(
RefreshToken.token_jti == token_data.jti
)
)
stored_token = result.scalar_one_or_none()
if not stored_token:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token not found",
)
# Detect reuse
if stored_token.is_revoked:
log_security_event(
AuditEvent.TOKEN_REUSE_DETECTED,
request=request,
user_id=stored_token.user_id,
details=f"Reused JTI: {token_data.jti}",
)
await db.execute(
update(RefreshToken)
.where(RefreshToken.user_id == stored_token.user_id)
.values(is_revoked=True)
)
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token reuse detected. All sessions revoked.",
)
# Revoke old token
stored_token.is_revoked = True
# Get user
user_service = UserService(db)
user = await user_service.get_by_id(stored_token.user_id)
if not user or not user.is_active:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="User not found or inactive",
)
# Issue new tokens
from security.scopes import get_allowed_scopes_for_role
granted_scopes = list(get_allowed_scopes_for_role(user.role))
new_access = create_access_token(data={
"sub": user.username,
"role": user.role.value,
"scopes": granted_scopes,
})
new_refresh = create_refresh_token(data={"sub": user.username})
# Store new refresh token
new_data = decode_token(new_refresh)
db.add(RefreshToken(
token_jti=new_data.jti,
user_id=user.id,
expires_at=datetime.now(timezone.utc) + timedelta(
days=settings.REFRESH_TOKEN_EXPIRE_DAYS
),
))
log_security_event(
AuditEvent.TOKEN_REFRESH,
request=request,
user_id=user.id,
username=user.username,
)
return TokenResponse(
access_token=new_access,
refresh_token=new_refresh,
expires_in=settings.ACCESS_TOKEN_EXPIRE_MINUTES * 60,
scope=" ".join(granted_scopes),
)
@router.post("/logout", response_model=MessageResponse)
async def logout(
request: Request,
body: RefreshRequest,
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db),
):
"""Logout by revoking the refresh token."""
try:
token_data = decode_token(body.refresh_token)
except JWTError:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Invalid token",
)
result = await db.execute(
update(RefreshToken)
.where(RefreshToken.token_jti == token_data.jti)
.values(is_revoked=True)
)
log_security_event(
AuditEvent.LOGOUT,
request=request,
user_id=current_user.id,
username=current_user.username,
)
return MessageResponse(detail="Successfully logged out")
@router.post("/change-password", response_model=MessageResponse)
async def change_password(
request: Request,
body: PasswordChangeRequest,
current_user: User = Depends(get_current_active_user),
db: AsyncSession = Depends(get_db),
):
"""Change the authenticated user's password."""
if not verify_password(body.current_password, current_user.hashed_password):
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="Current password is incorrect",
)
current_user.hashed_password = hash_password(body.new_password)
# Revoke all refresh tokens (force re-login on all devices)
await db.execute(
update(RefreshToken)
.where(
RefreshToken.user_id == current_user.id,
RefreshToken.is_revoked == False,
)
.values(is_revoked=True)
)
log_security_event(
AuditEvent.PASSWORD_CHANGED,
request=request,
user_id=current_user.id,
username=current_user.username,
)
return MessageResponse(
detail="Password changed. Please log in again on all devices."
)
# main.py
from contextlib import asynccontextmanager
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from config import settings
from database import init_db
from middleware.security_headers import SecurityHeadersMiddleware
from routers import admin, auth, users
@asynccontextmanager
async def lifespan(app: FastAPI):
"""Application startup and shutdown events."""
# Startup
await init_db()
print(f"Database initialized. Environment: {settings.ENVIRONMENT}")
yield
# Shutdown
print("Application shutting down.")
app = FastAPI(
title=settings.APP_NAME,
description=(
"Complete authentication and authorization system with JWT, "
"refresh token rotation, RBAC, OAuth2 scopes, and API keys."
),
version="1.0.0",
lifespan=lifespan,
docs_url="/docs" if settings.ENVIRONMENT != "production" else None,
redoc_url="/redoc" if settings.ENVIRONMENT != "production" else None,
)
# Middleware (order matters — last added runs first)
app.add_middleware(SecurityHeadersMiddleware)
app.add_middleware(
CORSMiddleware,
allow_origins=settings.ALLOWED_ORIGINS,
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE", "PATCH"],
allow_headers=["Authorization", "Content-Type", "X-API-Key"],
)
# Routers
app.include_router(auth.router)
app.include_router(users.router)
app.include_router(admin.router)
@app.get("/health")
async def health_check():
"""Health check endpoint for load balancers and monitoring."""
return {"status": "healthy", "version": "1.0.0"}
# Install dependencies pip install -r requirements.txt # Generate a secret key export SECRET_KEY=$(openssl rand -hex 32) # Run the development server uvicorn main:app --reload --host 0.0.0.0 --port 8000 # Open Swagger UI # http://localhost:8000/docs
# test_auth_system.py
"""
Integration tests for the complete auth system.
Run with: pytest test_auth_system.py -v
"""
import pytest
from httpx import AsyncClient, ASGITransport
from main import app
@pytest.fixture
async def client():
"""Create an async test client."""
transport = ASGITransport(app=app)
async with AsyncClient(transport=transport, base_url="http://test") as ac:
yield ac
@pytest.fixture
async def registered_user(client: AsyncClient) -> dict:
"""Register a test user and return credentials."""
user_data = {
"username": "testuser",
"email": "test@example.com",
"password": "TestPass123!",
"full_name": "Test User",
}
response = await client.post("/api/v1/users/register", json=user_data)
assert response.status_code == 201
return user_data
@pytest.fixture
async def auth_tokens(client: AsyncClient, registered_user: dict) -> dict:
"""Login and return access + refresh tokens."""
response = await client.post(
"/api/v1/auth/token",
data={
"username": registered_user["username"],
"password": registered_user["password"],
},
)
assert response.status_code == 200
return response.json()
class TestRegistration:
"""Test user registration."""
async def test_register_success(self, client: AsyncClient):
response = await client.post(
"/api/v1/users/register",
json={
"username": "newuser",
"email": "new@example.com",
"password": "SecurePass123!",
},
)
assert response.status_code == 201
data = response.json()
assert data["username"] == "newuser"
assert "hashed_password" not in data
async def test_register_duplicate_username(
self, client: AsyncClient, registered_user
):
response = await client.post(
"/api/v1/users/register",
json={
"username": registered_user["username"],
"email": "different@example.com",
"password": "SecurePass123!",
},
)
assert response.status_code == 409
async def test_register_weak_password(self, client: AsyncClient):
response = await client.post(
"/api/v1/users/register",
json={
"username": "weakuser",
"email": "weak@example.com",
"password": "weak",
},
)
assert response.status_code == 422
class TestLogin:
"""Test login and token generation."""
async def test_login_success(
self, client: AsyncClient, registered_user
):
response = await client.post(
"/api/v1/auth/token",
data={
"username": registered_user["username"],
"password": registered_user["password"],
},
)
assert response.status_code == 200
data = response.json()
assert "access_token" in data
assert "refresh_token" in data
assert data["token_type"] == "bearer"
async def test_login_wrong_password(
self, client: AsyncClient, registered_user
):
response = await client.post(
"/api/v1/auth/token",
data={
"username": registered_user["username"],
"password": "WrongPass123!",
},
)
assert response.status_code == 401
class TestProtectedEndpoints:
"""Test protected endpoint access."""
async def test_get_profile_authenticated(
self, client: AsyncClient, auth_tokens
):
response = await client.get(
"/api/v1/users/me",
headers={
"Authorization": f"Bearer {auth_tokens['access_token']}"
},
)
assert response.status_code == 200
async def test_get_profile_no_token(self, client: AsyncClient):
response = await client.get("/api/v1/users/me")
assert response.status_code == 401
async def test_get_profile_invalid_token(self, client: AsyncClient):
response = await client.get(
"/api/v1/users/me",
headers={"Authorization": "Bearer invalid-token"},
)
assert response.status_code == 401
class TestTokenRefresh:
"""Test refresh token rotation."""
async def test_refresh_success(
self, client: AsyncClient, auth_tokens
):
response = await client.post(
"/api/v1/auth/refresh",
json={"refresh_token": auth_tokens["refresh_token"]},
)
assert response.status_code == 200
data = response.json()
assert data["access_token"] != auth_tokens["access_token"]
assert data["refresh_token"] != auth_tokens["refresh_token"]
async def test_refresh_reuse_detection(
self, client: AsyncClient, auth_tokens
):
# First refresh: should succeed
response1 = await client.post(
"/api/v1/auth/refresh",
json={"refresh_token": auth_tokens["refresh_token"]},
)
assert response1.status_code == 200
# Second refresh with same token: should fail (reuse detected)
response2 = await client.post(
"/api/v1/auth/refresh",
json={"refresh_token": auth_tokens["refresh_token"]},
)
assert response2.status_code == 401
assert "reuse" in response2.json()["detail"].lower()
| Method | Endpoint | Auth | Description |
|---|---|---|---|
POST |
/api/v1/users/register |
None | Register a new user |
POST |
/api/v1/auth/token |
None | Login (get tokens) |
POST |
/api/v1/auth/refresh |
None | Refresh access token |
POST |
/api/v1/auth/logout |
Bearer | Revoke refresh token |
POST |
/api/v1/auth/change-password |
Bearer | Change password |
GET |
/api/v1/users/me |
Bearer | Get current user profile |
PUT |
/api/v1/users/me |
Bearer | Update profile |
DELETE |
/api/v1/users/me |
Bearer | Deactivate account |
POST |
/api/v1/api-keys/generate |
Bearer | Generate API key |
DELETE |
/api/v1/api-keys/revoke |
Bearer | Revoke API key |
GET |
/api/v1/admin/users |
Admin | List all users |
PUT |
/api/v1/admin/users/{id}/role |
Admin | Change user role |
GET |
/api/v1/admin/dashboard |
Admin | Admin statistics |
GET |
/health |
None | Health check |
┌──────────┐ 1. POST /register ┌──────────┐ │ │ ──────────────────────────> │ │ │ Client │ <────────────────────────── │ Server │ │ │ User created (201) │ │ │ │ │ │ │ │ 2. POST /token │ │ │ │ (username + password) │ │ │ │ ──────────────────────────> │ │ │ │ <────────────────────────── │ │ │ │ access_token + │ │ │ │ refresh_token │ │ │ │ │ │ │ │ 3. GET /users/me │ │ │ │ Authorization: Bearer │ │ │ │ ──────────────────────────> │ │ │ │ <────────────────────────── │ │ │ │ User profile (200) │ │ │ │ │ │ │ │ 4. POST /refresh │ │ │ │ (when token expires) │ │ │ │ ──────────────────────────> │ │ │ │ <────────────────────────── │ │ │ │ new access_token + │ │ │ │ new refresh_token │ │ │ │ │ │ │ │ 5. POST /logout │ │ │ │ ──────────────────────────> │ │ │ │ <────────────────────────── │ │ │ │ Tokens revoked (200) │ │ └──────────┘ └──────────┘
This tutorial covered a comprehensive set of authentication and authorization patterns in FastAPI. Here is a summary of the key concepts and techniques you learned:
| Topic | Key Concept | Implementation |
|---|---|---|
| Authentication vs Authorization | Authentication verifies identity; authorization controls access | Return 401 for auth failures, 403 for permission failures |
| Password Hashing | Never store plaintext passwords; use bcrypt with cost factor 12+ | passlib.context.CryptContext with bcrypt scheme |
| OAuth2 Password Flow | Standard flow for username/password login returning tokens | OAuth2PasswordBearer + OAuth2PasswordRequestForm |
| JWT Tokens | Stateless, signed tokens with claims (sub, exp, iat, jti) | python-jose for creating and verifying tokens |
| User Registration | Validate input, check duplicates, hash password, store user | Pydantic validators + SQLAlchemy models |
| Login System | Authenticate credentials, issue access + refresh tokens | Token endpoint returning TokenResponse |
| Protected Endpoints | Dependency injection validates tokens automatically | Depends(get_current_active_user) |
| RBAC | Roles map to permissions; check before granting access | require_role() and require_permission() dependencies |
| API Keys | Pre-shared keys for programmatic access | APIKeyHeader + database lookup |
| OAuth2 Scopes | Token-level permissions for fine-grained access control | Security(dep, scopes=[...]) + SecurityScopes |
| Refresh Token Rotation | New refresh token on each use; detect reuse as theft | Database-tracked JTIs with revocation flags |
| Security Best Practices | Defense in depth: HTTPS, CORS, headers, rate limiting, audit | Middleware stack + security header configuration |
# Import and use these in your endpoints:
from security.dependencies import (
get_current_user, # Basic: validates token, returns User
get_current_active_user, # + checks user.is_active
get_current_user_optional, # Returns User or None (no 401)
)
from security.rbac import (
require_role, # require_role(UserRole.ADMIN)
require_minimum_role, # require_minimum_role(UserRole.MODERATOR)
require_permission, # require_permission(Permission.DELETE_ANY_POST)
)
from security.scopes import (
get_current_user_with_scopes, # Security(dep, scopes=["posts:read"])
)
from security.api_key import (
get_user_from_api_key, # X-API-Key header auth
)
from security.combined_auth import (
get_current_user_flexible, # JWT OR API key
)
authlibpyotp