FastAPI has rapidly become one of the most popular Python web frameworks, and demand for FastAPI developers continues to grow. Whether you are preparing for your first Python backend role or aiming for a senior architect position, this guide covers the questions you are most likely to encounter in a real interview.
The questions are organized into three tiers:
| Level | Focus Areas | What Interviewers Look For |
|---|---|---|
| Junior | Core concepts, basic routing, Pydantic basics, running the app | Solid fundamentals, ability to build simple endpoints |
| Mid-Level | Dependency injection, auth, async patterns, testing, CRUD | Production-quality code, understanding of the request lifecycle |
| Senior | Architecture, ASGI, WebSockets, deployment, CI/CD, security | System design thinking, performance tuning, operational maturity |
FastAPI is a modern, high-performance Python web framework for building APIs. It is built on top of Starlette (for the web layer) and Pydantic (for data validation). Key reasons to choose FastAPI include:
from fastapi import FastAPI
app = FastAPI()
@app.get("/")
def read_root():
return {"message": "Hello, World!"}
Best practice: Choose FastAPI when you need an async-capable REST or GraphQL API with automatic request validation. If you only need to serve HTML templates with minimal API work, a lighter framework may suffice.
Path parameters are dynamic segments in the URL path. You declare them inside curly braces in the route decorator and as function arguments with type annotations. FastAPI automatically validates and converts the value to the declared type.
from fastapi import FastAPI
app = FastAPI()
@app.get("/users/{user_id}")
def get_user(user_id: int):
return {"user_id": user_id}
# GET /users/42 -> {"user_id": 42}
# GET /users/abc -> 422 Unprocessable Entity (validation error)
You can also use Path() for additional constraints:
from fastapi import FastAPI, Path
app = FastAPI()
@app.get("/items/{item_id}")
def get_item(item_id: int = Path(..., title="Item ID", ge=1, le=10000)):
return {"item_id": item_id}
Common pitfall: If you have both /users/me and /users/{user_id}, put the static route first. FastAPI matches routes in declaration order, so /users/{user_id} would capture "me" as a path parameter if declared first.
Query parameters are key-value pairs appended to the URL after a ?. Any function parameter that is not part of the path is automatically treated as a query parameter.
from fastapi import FastAPI
from typing import Optional
app = FastAPI()
@app.get("/items")
def list_items(skip: int = 0, limit: int = 10, q: Optional[str] = None):
result = {"skip": skip, "limit": limit}
if q:
result["query"] = q
return result
# GET /items?skip=5&limit=20&q=phone
# -> {"skip": 5, "limit": 20, "query": "phone"}
Use Query() for extra validation:
from fastapi import FastAPI, Query
app = FastAPI()
@app.get("/search")
def search(q: str = Query(..., min_length=2, max_length=100)):
return {"query": q}
Best practice: Always set sensible defaults for pagination parameters (skip, limit) and cap the maximum limit to prevent clients from requesting excessively large result sets.
Type hints are central to FastAPI. They serve multiple purposes simultaneously:
| Purpose | How Type Hints Help |
|---|---|
| Request validation | FastAPI validates incoming data against declared types automatically |
| Serialization | Response data is serialized based on the return type or response_model |
| Documentation | Swagger UI reflects parameter types, descriptions, and constraints |
| Editor support | IDEs provide auto-complete, type checking, and refactoring tools |
from fastapi import FastAPI
from pydantic import BaseModel
from typing import Optional, List
app = FastAPI()
class Item(BaseModel):
name: str
price: float
tags: List[str] = []
description: Optional[str] = None
@app.post("/items", response_model=Item)
def create_item(item: Item):
# item is already validated and typed
return item
Key insight: Unlike Flask, where you manually call request.get_json() and validate fields yourself, FastAPI uses type hints to handle all of this declaratively. This eliminates an entire class of bugs.
Pydantic is a data validation and settings management library that uses Python type annotations. FastAPI relies on Pydantic for:
BaseSettingsfrom pydantic import BaseModel, Field, field_validator
from typing import Optional
from datetime import datetime
class UserCreate(BaseModel):
username: str = Field(..., min_length=3, max_length=50)
email: str
age: int = Field(..., ge=13, le=120)
bio: Optional[str] = None
@field_validator("email")
@classmethod
def validate_email(cls, v):
if "@" not in v:
raise ValueError("Invalid email address")
return v.lower()
class UserResponse(BaseModel):
id: int
username: str
email: str
created_at: datetime
model_config = {"from_attributes": True}
The model_config = {"from_attributes": True} setting (formerly class Config: orm_mode = True in Pydantic v1) allows Pydantic to read data from ORM objects like SQLAlchemy models.
Common pitfall: Forgetting to enable from_attributes when returning ORM objects will cause serialization errors.
FastAPI applications are ASGI apps, so you need an ASGI server. The most common choice is Uvicorn.
# Install pip install fastapi uvicorn # Run in development with auto-reload uvicorn main:app --reload --host 0.0.0.0 --port 8000 # Run in production with multiple workers uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4
You can also run it programmatically:
import uvicorn
if __name__ == "__main__":
uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)
For production deployments, Gunicorn with Uvicorn workers is the recommended pattern:
gunicorn main:app -w 4 -k uvicorn.workers.UvicornWorker --bind 0.0.0.0:8000
Best practice: Use --reload only in development. In production, use Gunicorn as the process manager with Uvicorn workers for robustness.
These decorators bind a function to an HTTP method. Each method has a specific semantic meaning:
| Decorator | HTTP Method | Purpose | Request Body | Idempotent |
|---|---|---|---|---|
@app.get |
GET | Retrieve data | No | Yes |
@app.post |
POST | Create a resource | Yes | No |
@app.put |
PUT | Replace a resource | Yes | Yes |
@app.patch |
PATCH | Partially update | Yes | No |
@app.delete |
DELETE | Delete a resource | Optional | Yes |
from fastapi import FastAPI
from pydantic import BaseModel
app = FastAPI()
class Item(BaseModel):
name: str
price: float
items_db = {}
@app.get("/items/{item_id}")
def get_item(item_id: int):
return items_db.get(item_id, {"error": "Not found"})
@app.post("/items", status_code=201)
def create_item(item: Item):
item_id = len(items_db) + 1
items_db[item_id] = item.model_dump()
return {"id": item_id, **item.model_dump()}
@app.put("/items/{item_id}")
def replace_item(item_id: int, item: Item):
items_db[item_id] = item.model_dump()
return {"id": item_id, **item.model_dump()}
@app.delete("/items/{item_id}", status_code=204)
def delete_item(item_id: int):
items_db.pop(item_id, None)
FastAPI automatically generates interactive API documentation from your route definitions, type hints, and Pydantic models. Two UIs are available out of the box:
| URL | UI | Description |
|---|---|---|
/docs |
Swagger UI | Interactive documentation with a “Try it out” feature |
/redoc |
ReDoc | Clean, read-only documentation |
/openapi.json |
Raw JSON | The OpenAPI schema as JSON |
from fastapi import FastAPI
# Customize docs metadata
app = FastAPI(
title="My API",
description="A comprehensive API for managing items",
version="1.0.0",
docs_url="/docs", # default
redoc_url="/redoc", # default
openapi_url="/openapi.json" # default
)
# Disable docs in production
app_prod = FastAPI(docs_url=None, redoc_url=None)
Best practice: Disable interactive docs in production for security. You can conditionally enable them based on an environment variable.
Uvicorn is a lightning-fast ASGI server implementation. It serves as the bridge between the network and your FastAPI application.
# Standard install pip install uvicorn[standard] # Run with SSL uvicorn main:app --ssl-keyfile=key.pem --ssl-certfile=cert.pem # Run with specific log level uvicorn main:app --log-level warning
Key insight: Uvicorn itself is single-process. For production, pair it with Gunicorn (gunicorn -k uvicorn.workers.UvicornWorker) to get multi-process concurrency.
FastAPI provides several mechanisms for error handling:
from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError
app = FastAPI()
items_db = {1: {"name": "Laptop"}}
# 1. HTTPException for known errors
@app.get("/items/{item_id}")
def get_item(item_id: int):
if item_id not in items_db:
raise HTTPException(
status_code=404,
detail="Item not found",
headers={"X-Error": "Item lookup failed"}
)
return items_db[item_id]
# 2. Custom exception class
class ItemNotFoundError(Exception):
def __init__(self, item_id: int):
self.item_id = item_id
@app.exception_handler(ItemNotFoundError)
async def item_not_found_handler(request: Request, exc: ItemNotFoundError):
return JSONResponse(
status_code=404,
content={"detail": f"Item {exc.item_id} does not exist"}
)
# 3. Override default validation error handler
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
return JSONResponse(
status_code=422,
content={
"detail": "Validation failed",
"errors": exc.errors()
}
)
Best practice: Use HTTPException for simple cases. Create custom exception classes and handlers for domain-specific errors to keep your route functions clean.
| Feature | FastAPI | Flask |
|---|---|---|
| Type | ASGI (async-native) | WSGI (sync by default) |
| Validation | Built-in via Pydantic | Manual or via extensions (Marshmallow) |
| Documentation | Auto-generated Swagger & ReDoc | Manual or via Flask-RESTX |
| Performance | Very high (on par with Go/Node) | Moderate |
| Dependency injection | Built-in Depends() system |
Not built-in |
| Async support | Native async/await | Limited (added in Flask 2.0) |
| Ecosystem maturity | Growing rapidly | Very mature, huge plugin ecosystem |
| Learning curve | Moderate (need to understand type hints) | Low (simple and minimal) |
When to choose FastAPI: New API-first projects that need high performance, automatic validation, and auto-generated docs.
When to choose Flask: Projects that need extensive HTML template rendering, or when your team has deep Flask experience and a large existing Flask codebase.
from fastapi import FastAPI, Response, HTTPException
from fastapi.responses import JSONResponse
app = FastAPI()
# Method 1: Set default status code in decorator
@app.post("/items", status_code=201)
def create_item(item: dict):
return {"id": 1, **item}
# Method 2: Use Response parameter for dynamic codes
@app.get("/items/{item_id}")
def get_item(item_id: int, response: Response):
if item_id == 0:
response.status_code = 204
return None
return {"item_id": item_id}
# Method 3: Return a Response object directly
@app.get("/health")
def health_check():
healthy = True
if healthy:
return JSONResponse(content={"status": "ok"}, status_code=200)
return JSONResponse(content={"status": "degraded"}, status_code=503)
# Method 4: HTTPException for error codes
@app.get("/secure")
def secure_endpoint():
raise HTTPException(status_code=403, detail="Forbidden")
Best practice: Use the status_code parameter in the decorator for the “happy path” response. Use HTTPException for error paths. This keeps your OpenAPI docs accurate.
FastAPI has a powerful built-in dependency injection system using Depends(). Dependencies are functions (or classes) that are called before your route handler, and their return values are injected as parameters.
from fastapi import FastAPI, Depends, Query
from typing import Optional
app = FastAPI()
# Simple function dependency
def common_parameters(
skip: int = Query(0, ge=0),
limit: int = Query(10, ge=1, le=100),
q: Optional[str] = None
):
return {"skip": skip, "limit": limit, "q": q}
@app.get("/items")
def list_items(params: dict = Depends(common_parameters)):
return {"params": params}
@app.get("/users")
def list_users(params: dict = Depends(common_parameters)):
return {"params": params}
# Class-based dependency
class Pagination:
def __init__(self, skip: int = 0, limit: int = 10):
self.skip = skip
self.limit = limit
@app.get("/products")
def list_products(pagination: Pagination = Depends()):
return {"skip": pagination.skip, "limit": pagination.limit}
# Nested dependencies
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
def get_current_user(db=Depends(get_db)):
# Uses db dependency
user = db.query(User).first()
return user
@app.get("/profile")
def get_profile(user=Depends(get_current_user)):
return {"username": user.username}
Key insight: Dependencies that use yield act like context managers. Code after yield runs after the response is sent, making them perfect for cleanup tasks like closing database connections.
When a request arrives at a FastAPI endpoint, it goes through a well-defined validation pipeline:
response_model if specified.yield dependencies.from fastapi import FastAPI, Depends, Header, Path, Query, HTTPException
from pydantic import BaseModel
app = FastAPI()
class ItemCreate(BaseModel):
name: str
price: float
class ItemResponse(BaseModel):
id: int
name: str
price: float
def verify_token(x_token: str = Header(...)):
if x_token != "secret-token":
raise HTTPException(status_code=403, detail="Invalid token")
return x_token
@app.post(
"/categories/{category_id}/items",
response_model=ItemResponse,
status_code=201
)
def create_item(
category_id: int = Path(..., ge=1), # Step 1: path
q: str = Query(None), # Step 2: query
token: str = Depends(verify_token), # Step 7: dependency
item: ItemCreate = ..., # Steps 5-6: body + validation
):
# Step 8: handler executes with all validated data
return ItemResponse(id=1, name=item.name, price=item.price)
# Step 9: response serialized via response_model
If validation fails at any step, FastAPI returns a 422 Unprocessable Entity response with detailed error information.
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel
from datetime import datetime, timedelta
from typing import Optional
# Configuration
SECRET_KEY = "your-secret-key-keep-it-secret"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
app = FastAPI()
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# Models
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Optional[str] = None
class User(BaseModel):
username: str
email: str
disabled: bool = False
# Helper functions
def verify_password(plain_password: str, hashed_password: str) -> bool:
return pwd_context.verify(plain_password, hashed_password)
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
to_encode.update({"exp": expire})
return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
async def get_current_user(token: str = Depends(oauth2_scheme)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Could not validate credentials",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
except JWTError:
raise credentials_exception
user = get_user_from_db(username)
if user is None:
raise credentials_exception
return user
# Endpoints
@app.post("/token", response_model=Token)
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
user = authenticate_user(form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Incorrect username or password",
)
access_token = create_access_token(
data={"sub": user.username},
expires_delta=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
)
return {"access_token": access_token, "token_type": "bearer"}
@app.get("/users/me", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_user)):
return current_user
OAuth2PasswordBearer is a FastAPI security utility class that implements the OAuth2 Password flow. It does two things:
Authorization: Bearer <token> header from incoming requests.from fastapi import FastAPI, Depends
from fastapi.security import OAuth2PasswordBearer
app = FastAPI()
# tokenUrl is the endpoint where clients POST credentials to get a token
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# Using it as a dependency simply extracts the token string
@app.get("/protected")
async def protected_route(token: str = Depends(oauth2_scheme)):
# token is the raw Bearer token string
# You still need to decode/validate it yourself
return {"token": token}
OAuth2PasswordBearer does not validate the token. It only extracts it. You must combine it with your own validation logic (e.g., JWT decoding) in a dependency.
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
from fastapi import FastAPI, Depends, HTTPException
DATABASE_URL = "postgresql://user:password@localhost:5432/mydb"
engine = create_engine(DATABASE_URL, pool_size=10, max_overflow=20)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
# Dependency that provides a database session
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
app = FastAPI()
@app.get("/users/{user_id}")
def get_user(user_id: int, db: Session = Depends(get_db)):
user = db.query(User).filter(User.id == user_id).first()
if not user:
raise HTTPException(status_code=404, detail="User not found")
return user
@app.post("/users", status_code=201)
def create_user(user_data: UserCreate, db: Session = Depends(get_db)):
db_user = User(**user_data.model_dump())
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
Key insight: The yield pattern ensures the session is always closed, even if an exception occurs during request processing. This prevents connection leaks.
FastAPI natively supports Python’s async/await syntax because it runs on ASGI (Asynchronous Server Gateway Interface).
import httpx
from fastapi import FastAPI, Depends
app = FastAPI()
# Async endpoint - runs on the event loop
@app.get("/async-data")
async def get_async_data():
async with httpx.AsyncClient() as client:
response = await client.get("https://api.example.com/data")
return response.json()
# Sync endpoint - runs in a thread pool
@app.get("/sync-data")
def get_sync_data():
# FastAPI automatically runs this in a thread pool
# so it does not block the event loop
import time
time.sleep(1) # Simulates blocking I/O
return {"data": "result"}
# Async dependency
async def get_async_client():
async with httpx.AsyncClient() as client:
yield client
@app.get("/external")
async def call_external(client: httpx.AsyncClient = Depends(get_async_client)):
response = await client.get("https://api.example.com/resource")
return response.json()
Important rule: If your function uses await, declare it with async def. If it performs blocking I/O (database calls via synchronous drivers, file I/O), use regular def and let FastAPI handle the threading.
| Aspect | async def endpoint | def endpoint (sync) |
|---|---|---|
| Execution | Runs directly on the async event loop | Runs in a separate thread from a thread pool |
| Blocking I/O | Must use async libraries (httpx, aiofiles, asyncpg) | Can safely use blocking libraries (requests, open()) |
| Concurrency | Thousands of concurrent tasks via event loop | Limited by thread pool size (default: 40 threads) |
| CPU-bound work | Blocks the event loop – avoid | Blocks one thread – slightly better |
import httpx
# WRONG: blocking call in async function blocks the event loop
@app.get("/bad")
async def bad_endpoint():
import requests # blocking library!
response = requests.get("https://api.example.com") # blocks event loop
return response.json()
# CORRECT: use async library in async function
@app.get("/good-async")
async def good_async_endpoint():
async with httpx.AsyncClient() as client:
response = await client.get("https://api.example.com")
return response.json()
# CORRECT: use sync function for blocking calls
@app.get("/good-sync")
def good_sync_endpoint():
import requests
response = requests.get("https://api.example.com")
return response.json()
async def and then using blocking libraries like requests or synchronous database drivers. This blocks the entire event loop and kills performance.from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.orm import Session
from pydantic import BaseModel
from typing import List, Optional
app = FastAPI()
# --- Pydantic schemas ---
class ProductCreate(BaseModel):
name: str
description: Optional[str] = None
price: float
category: str
class ProductUpdate(BaseModel):
name: Optional[str] = None
description: Optional[str] = None
price: Optional[float] = None
category: Optional[str] = None
class ProductResponse(BaseModel):
id: int
name: str
description: Optional[str]
price: float
category: str
model_config = {"from_attributes": True}
# --- CRUD functions (service layer) ---
def create_product(db: Session, product: ProductCreate):
db_product = Product(**product.model_dump())
db.add(db_product)
db.commit()
db.refresh(db_product)
return db_product
def get_products(db: Session, skip: int = 0, limit: int = 100):
return db.query(Product).offset(skip).limit(limit).all()
def get_product(db: Session, product_id: int):
return db.query(Product).filter(Product.id == product_id).first()
def update_product(db: Session, product_id: int, updates: ProductUpdate):
db_product = db.query(Product).filter(Product.id == product_id).first()
if not db_product:
return None
update_data = updates.model_dump(exclude_unset=True)
for field, value in update_data.items():
setattr(db_product, field, value)
db.commit()
db.refresh(db_product)
return db_product
def delete_product(db: Session, product_id: int):
db_product = db.query(Product).filter(Product.id == product_id).first()
if not db_product:
return False
db.delete(db_product)
db.commit()
return True
# --- Route handlers ---
@app.post("/products", response_model=ProductResponse, status_code=201)
def create(product: ProductCreate, db: Session = Depends(get_db)):
return create_product(db, product)
@app.get("/products", response_model=List[ProductResponse])
def read_all(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
return get_products(db, skip, limit)
@app.get("/products/{product_id}", response_model=ProductResponse)
def read_one(product_id: int, db: Session = Depends(get_db)):
product = get_product(db, product_id)
if not product:
raise HTTPException(status_code=404, detail="Product not found")
return product
@app.patch("/products/{product_id}", response_model=ProductResponse)
def update(product_id: int, updates: ProductUpdate, db: Session = Depends(get_db)):
product = update_product(db, product_id, updates)
if not product:
raise HTTPException(status_code=404, detail="Product not found")
return product
@app.delete("/products/{product_id}", status_code=204)
def delete(product_id: int, db: Session = Depends(get_db)):
if not delete_product(db, product_id):
raise HTTPException(status_code=404, detail="Product not found")
Best practice: Separate CRUD logic into a service layer (separate module) rather than putting database queries directly in route handlers. This makes the code testable and reusable.
# app/routers/users.py
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from typing import List
router = APIRouter(
prefix="/users",
tags=["users"],
responses={404: {"description": "Not found"}},
)
@router.get("/", response_model=List[UserResponse])
def list_users(db: Session = Depends(get_db)):
return db.query(User).all()
@router.get("/{user_id}", response_model=UserResponse)
def get_user(user_id: int, db: Session = Depends(get_db)):
return db.query(User).filter(User.id == user_id).first()
@router.post("/", response_model=UserResponse, status_code=201)
def create_user(user: UserCreate, db: Session = Depends(get_db)):
db_user = User(**user.model_dump())
db.add(db_user)
db.commit()
db.refresh(db_user)
return db_user
# app/routers/products.py
from fastapi import APIRouter
router = APIRouter(prefix="/products", tags=["products"])
@router.get("/")
def list_products():
return []
# app/main.py from fastapi import FastAPI from app.routers import users, products app = FastAPI(title="My API") app.include_router(users.router) app.include_router(products.router) # You can also add a prefix when including # app.include_router(users.router, prefix="/api/v1")
A well-organized project structure looks like this:
app/
__init__.py
main.py # FastAPI app instance and router includes
config.py # Settings and configuration
database.py # Database engine and session
models/ # SQLAlchemy models
__init__.py
user.py
product.py
schemas/ # Pydantic schemas
__init__.py
user.py
product.py
routers/ # Route handlers
__init__.py
users.py
products.py
services/ # Business logic
__init__.py
user_service.py
product_service.py
dependencies/ # Shared dependencies
__init__.py
auth.py
database.py
Background tasks let you run code after the response has been sent to the client. They are useful for operations that the client does not need to wait for.
from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel
app = FastAPI()
def send_email(email: str, subject: str, body: str):
# Simulate sending email (runs after response is sent)
import time
time.sleep(3)
print(f"Email sent to {email}: {subject}")
def write_audit_log(user_id: int, action: str):
# Write to audit log after response
with open("audit.log", "a") as f:
f.write(f"{user_id}: {action}\n")
class UserCreate(BaseModel):
username: str
email: str
@app.post("/users", status_code=201)
def create_user(user: UserCreate, background_tasks: BackgroundTasks):
# Create user in database (synchronous, client waits)
new_user = {"id": 1, "username": user.username, "email": user.email}
# These run AFTER the response is sent
background_tasks.add_task(send_email, user.email, "Welcome!", "Thanks for joining")
background_tasks.add_task(write_audit_log, 1, "user_created")
return new_user
When to use background tasks vs. a task queue (Celery/Redis):
| Criteria | BackgroundTasks | Celery / Task Queue |
|---|---|---|
| Duration | Short (seconds) | Long (minutes/hours) |
| Reliability | Lost if server crashes | Persisted in broker, retryable |
| Infrastructure | None extra | Needs Redis/RabbitMQ |
| Use case | Emails, logging, cache invalidation | Video processing, reports, ETL |
from fastapi.testclient import TestClient
from fastapi import FastAPI, Depends
import pytest
app = FastAPI()
@app.get("/")
def read_root():
return {"message": "Hello"}
@app.get("/items/{item_id}")
def read_item(item_id: int):
return {"item_id": item_id}
# --- Basic tests ---
client = TestClient(app)
def test_read_root():
response = client.get("/")
assert response.status_code == 200
assert response.json() == {"message": "Hello"}
def test_read_item():
response = client.get("/items/42")
assert response.status_code == 200
assert response.json() == {"item_id": 42}
def test_invalid_item_id():
response = client.get("/items/not-a-number")
assert response.status_code == 422
# --- Testing with dependency overrides ---
def get_db():
return real_database_session()
@app.get("/users")
def get_users(db=Depends(get_db)):
return []
def override_get_db():
return test_database_session()
app.dependency_overrides[get_db] = override_get_db
# --- Async testing with httpx ---
import httpx
@pytest.mark.anyio
async def test_async_root():
async with httpx.AsyncClient(
transport=httpx.ASGITransport(app=app),
base_url="http://test"
) as ac:
response = await ac.get("/")
assert response.status_code == 200
# --- Testing with pytest fixtures ---
@pytest.fixture
def test_client():
with TestClient(app) as c:
yield c
def test_with_fixture(test_client):
response = test_client.get("/")
assert response.status_code == 200
Best practice: Use dependency_overrides to replace real databases, external APIs, and authentication with test doubles. This makes your tests fast and deterministic.
from fastapi import FastAPI, UploadFile, File, HTTPException, Form
from typing import List
import os
app = FastAPI()
UPLOAD_DIR = "uploads"
ALLOWED_TYPES = {"image/jpeg", "image/png", "image/gif", "application/pdf"}
MAX_SIZE = 10 * 1024 * 1024 # 10 MB
# Single file upload
@app.post("/upload")
async def upload_file(file: UploadFile = File(...)):
if file.content_type not in ALLOWED_TYPES:
raise HTTPException(status_code=400, detail="File type not allowed")
# Check file size
contents = await file.read()
if len(contents) > MAX_SIZE:
raise HTTPException(status_code=400, detail="File too large")
file_path = os.path.join(UPLOAD_DIR, file.filename)
with open(file_path, "wb") as f:
f.write(contents)
return {"filename": file.filename, "size": len(contents)}
# Multiple file upload
@app.post("/upload-multiple")
async def upload_multiple(files: List[UploadFile] = File(...)):
results = []
for file in files:
contents = await file.read()
file_path = os.path.join(UPLOAD_DIR, file.filename)
with open(file_path, "wb") as f:
f.write(contents)
results.append({"filename": file.filename, "size": len(contents)})
return results
# File upload with additional form data
@app.post("/upload-with-metadata")
async def upload_with_metadata(
file: UploadFile = File(...),
description: str = Form(...),
category: str = Form("general")
):
contents = await file.read()
return {
"filename": file.filename,
"description": description,
"category": category,
"size": len(contents)
}
Common pitfall: Calling await file.read() loads the entire file into memory. For large files, use chunked reading:
@app.post("/upload-large")
async def upload_large_file(file: UploadFile = File(...)):
file_path = os.path.join(UPLOAD_DIR, file.filename)
with open(file_path, "wb") as f:
while chunk := await file.read(1024 * 1024): # 1MB chunks
f.write(chunk)
return {"filename": file.filename}
A scalable FastAPI architecture addresses code organization, deployment topology, and operational concerns. Here is a proven pattern:
# Project structure for a scalable FastAPI application
project/
app/
__init__.py
main.py # App factory, middleware, router includes
config.py # Pydantic BaseSettings for env-based config
database.py # Engine, session factory, base model
middleware/
__init__.py
logging.py # Request/response logging
cors.py # CORS configuration
rate_limit.py # Rate limiting middleware
api/
__init__.py
v1/
__init__.py
router.py # Aggregates all v1 routers
endpoints/
users.py
products.py
orders.py
v2/
__init__.py
router.py
models/ # SQLAlchemy ORM models
schemas/ # Pydantic request/response schemas
services/ # Business logic layer
repositories/ # Data access layer
dependencies/ # Shared Depends() functions
events/ # Startup/shutdown event handlers
utils/ # Shared utilities
alembic/ # Database migrations
tests/
conftest.py
test_users.py
test_products.py
docker-compose.yml
Dockerfile
pyproject.toml
# app/config.py - Environment-based configuration
from pydantic_settings import BaseSettings
from functools import lru_cache
class Settings(BaseSettings):
app_name: str = "My API"
debug: bool = False
database_url: str
redis_url: str = "redis://localhost:6379"
secret_key: str
allowed_origins: list[str] = ["http://localhost:3000"]
model_config = {"env_file": ".env"}
@lru_cache()
def get_settings():
return Settings()
# app/main.py - App factory pattern
from fastapi import FastAPI
from contextlib import asynccontextmanager
from app.config import get_settings
from app.api.v1.router import router as v1_router
from app.database import engine, Base
@asynccontextmanager
async def lifespan(app: FastAPI):
# Startup
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
yield
# Shutdown
await engine.dispose()
def create_app() -> FastAPI:
settings = get_settings()
app = FastAPI(
title=settings.app_name,
lifespan=lifespan,
docs_url="/docs" if settings.debug else None,
)
app.include_router(v1_router, prefix="/api/v1")
return app
app = create_app()
Key architectural principles:
/api/v1, /api/v2) to evolve your API without breaking clients.BaseSettings for type-safe, environment-driven configuration.ASGI (Asynchronous Server Gateway Interface) is the spiritual successor to WSGI. It defines a standard interface between async-capable Python web servers and applications.
| Feature | WSGI | ASGI |
|---|---|---|
| Concurrency model | Synchronous, one request per thread | Asynchronous, event-loop based |
| Protocol support | HTTP only | HTTP, WebSocket, HTTP/2 |
| Connection lifecycle | Request-response only | Long-lived connections supported |
| Frameworks | Flask, Django | FastAPI, Starlette, Django (3.0+) |
| Servers | Gunicorn, uWSGI | Uvicorn, Daphne, Hypercorn |
At its core, an ASGI application is a callable with this signature:
# Raw ASGI application example
async def app(scope, receive, send):
# scope - dict with connection info (type, path, headers, etc.)
# receive - async callable to receive messages from client
# send - async callable to send messages to client
if scope["type"] == "http":
# Read request body
body = b""
while True:
message = await receive()
body += message.get("body", b"")
if not message.get("more_body", False):
break
# Send response
await send({
"type": "http.response.start",
"status": 200,
"headers": [(b"content-type", b"application/json")],
})
await send({
"type": "http.response.body",
"body": b'{"message": "Hello from raw ASGI"}',
})
FastAPI wraps this low-level protocol behind its elegant decorator-based API. When you write @app.get("/items"), FastAPI (via Starlette) handles all the ASGI message passing for you.
The request flow: Client → Uvicorn (ASGI server) → Starlette (ASGI toolkit) → FastAPI (routing + validation) → Your handler
from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List
app = FastAPI()
# Connection manager for multiple clients
class ConnectionManager:
def __init__(self):
self.active_connections: List[WebSocket] = []
async def connect(self, websocket: WebSocket):
await websocket.accept()
self.active_connections.append(websocket)
def disconnect(self, websocket: WebSocket):
self.active_connections.remove(websocket)
async def send_personal(self, message: str, websocket: WebSocket):
await websocket.send_text(message)
async def broadcast(self, message: str):
for connection in self.active_connections:
await connection.send_text(message)
manager = ConnectionManager()
@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):
await manager.connect(websocket)
try:
while True:
data = await websocket.receive_text()
# Echo back to sender
await manager.send_personal(f"You said: {data}", websocket)
# Broadcast to all
await manager.broadcast(f"Client #{client_id}: {data}")
except WebSocketDisconnect:
manager.disconnect(websocket)
await manager.broadcast(f"Client #{client_id} left the chat")
# WebSocket with JSON messages
@app.websocket("/ws/json")
async def json_websocket(websocket: WebSocket):
await websocket.accept()
try:
while True:
data = await websocket.receive_json()
action = data.get("action")
if action == "subscribe":
await websocket.send_json({"status": "subscribed", "channel": data["channel"]})
elif action == "message":
await websocket.send_json({"echo": data["content"]})
except WebSocketDisconnect:
pass
Best practice: For production WebSocket applications, use Redis Pub/Sub or a message broker to coordinate messages across multiple server instances, since in-memory connection managers only work within a single process.
High-concurrency optimization in FastAPI involves several layers:
# 1. Use async everywhere possible
import asyncpg
from fastapi import FastAPI
app = FastAPI()
# Use async database driver
pool = None
async def startup():
global pool
pool = await asyncpg.create_pool(
"postgresql://user:pass@localhost/db",
min_size=10,
max_size=50
)
@app.get("/users/{user_id}")
async def get_user(user_id: int):
async with pool.acquire() as conn:
row = await conn.fetchrow("SELECT * FROM users WHERE id = $1", user_id)
return dict(row)
# 2. Use connection pooling
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession
engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/db",
pool_size=20,
max_overflow=10,
pool_timeout=30,
pool_recycle=1800, # Recycle connections after 30 minutes
)
# 3. Add response caching
from fastapi_cache import FastAPICache
from fastapi_cache.backends.redis import RedisBackend
from fastapi_cache.decorator import cache
@app.get("/expensive-query")
@cache(expire=60)
async def expensive_query():
# Result is cached in Redis for 60 seconds
return await run_expensive_computation()
# 4. Use streaming responses for large payloads
from fastapi.responses import StreamingResponse
import json
@app.get("/large-dataset")
async def stream_data():
async def generate():
for chunk in fetch_large_dataset_in_chunks():
yield json.dumps(chunk) + "\n"
return StreamingResponse(generate(), media_type="application/x-ndjson")
# 5. Scale with multiple workers
gunicorn main:app -w 4 -k uvicorn.workers.UvicornWorker \
--bind 0.0.0.0:8000 \
--timeout 120 \
--keep-alive 5
Optimization checklist:
py-spy to find bottlenecks# SQLAlchemy async connection pooling
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from sqlalchemy.pool import QueuePool
engine = create_async_engine(
"postgresql+asyncpg://user:pass@localhost/db",
poolclass=QueuePool,
pool_size=20, # Steady-state connections
max_overflow=10, # Extra connections under load (total max: 30)
pool_timeout=30, # Seconds to wait for a connection
pool_recycle=1800, # Recycle connections every 30 minutes
pool_pre_ping=True, # Test connections before using them
echo=False, # Set True to log all SQL
)
AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)
async def get_db() -> AsyncSession:
async with AsyncSessionLocal() as session:
try:
yield session
await session.commit()
except Exception:
await session.rollback()
raise
Sizing guidelines:
| Parameter | Guideline |
|---|---|
pool_size |
Number of Uvicorn workers x expected concurrent DB queries per worker. Start with 5-10 per worker. |
max_overflow |
50-100% of pool_size for burst handling |
pool_timeout |
Lower values (10-30s) fail fast; higher values queue more requests |
pool_recycle |
Set below your database’s wait_timeout to avoid stale connections |
max_connections setting. The total connections across all workers must not exceed the database limit.# Method 1: Custom middleware with Redis
from fastapi import FastAPI, Request, HTTPException, Depends
from fastapi.responses import JSONResponse
import aioredis
app = FastAPI()
redis = None
async def startup():
global redis
redis = aioredis.from_url("redis://localhost")
@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
client_ip = request.client.host
key = f"rate_limit:{client_ip}"
window = 60 # seconds
max_requests = 100
current = await redis.get(key)
if current and int(current) >= max_requests:
return JSONResponse(
status_code=429,
content={"detail": "Too many requests"},
headers={"Retry-After": str(window)}
)
pipe = redis.pipeline()
pipe.incr(key)
pipe.expire(key, window)
await pipe.execute()
response = await call_next(request)
return response
# Method 2: Dependency-based rate limiting (per-route)
class RateLimiter:
def __init__(self, max_requests: int, window_seconds: int):
self.max_requests = max_requests
self.window = window_seconds
async def __call__(self, request: Request):
client_ip = request.client.host
key = f"rate:{client_ip}:{request.url.path}"
current = await redis.get(key)
if current and int(current) >= self.max_requests:
raise HTTPException(
status_code=429,
detail=f"Rate limit exceeded. Try again in {self.window} seconds."
)
pipe = redis.pipeline()
pipe.incr(key)
pipe.expire(key, self.window)
await pipe.execute()
# Apply different limits to different routes
@app.get("/search", dependencies=[Depends(RateLimiter(max_requests=30, window_seconds=60))])
async def search(q: str):
return {"results": []}
@app.post("/upload", dependencies=[Depends(RateLimiter(max_requests=5, window_seconds=60))])
async def upload():
return {"status": "ok"}
Best practice: Use dependency-based rate limiting so you can apply different limits to different endpoints. Expensive operations (search, uploads) should have stricter limits than simple reads.
# Dockerfile (multi-stage build) FROM python:3.11-slim AS builder WORKDIR /app COPY requirements.txt . RUN pip install --no-cache-dir --prefix=/install -r requirements.txt FROM python:3.11-slim WORKDIR /app COPY --from=builder /install /usr/local COPY ./app ./app EXPOSE 8000 CMD ["gunicorn", "app.main:app", "-w", "4", "-k", "uvicorn.workers.UvicornWorker", "--bind", "0.0.0.0:8000"]
# docker-compose.yml
version: "3.8"
services:
api:
build: .
environment:
- DATABASE_URL=postgresql://user:pass@db:5432/mydb
- REDIS_URL=redis://redis:6379
depends_on:
- db
- redis
networks:
- backend
nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx.conf:/etc/nginx/nginx.conf
- ./certs:/etc/ssl/certs
depends_on:
- api
networks:
- backend
db:
image: postgres:15
environment:
POSTGRES_USER: user
POSTGRES_PASSWORD: pass
POSTGRES_DB: mydb
volumes:
- pgdata:/var/lib/postgresql/data
networks:
- backend
redis:
image: redis:7-alpine
networks:
- backend
volumes:
pgdata:
networks:
backend:
# nginx.conf
events {
worker_connections 1024;
}
http {
upstream fastapi {
server api:8000;
}
server {
listen 80;
server_name api.example.com;
location / {
proxy_pass http://fastapi;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
location /ws {
proxy_pass http://fastapi;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
}
}
}
Key considerations:
2 * CPU_CORES + 1.Upgrade header.# Event-driven architecture using an in-process event bus
from fastapi import FastAPI
from typing import Callable, Dict, List
import asyncio
# Simple event bus
class EventBus:
def __init__(self):
self._handlers: Dict[str, List[Callable]] = {}
def subscribe(self, event_type: str, handler: Callable):
if event_type not in self._handlers:
self._handlers[event_type] = []
self._handlers[event_type].append(handler)
async def publish(self, event_type: str, data: dict):
handlers = self._handlers.get(event_type, [])
await asyncio.gather(*[handler(data) for handler in handlers])
event_bus = EventBus()
# Register handlers
async def send_welcome_email(data: dict):
print(f"Sending welcome email to {data['email']}")
async def create_default_settings(data: dict):
print(f"Creating default settings for user {data['user_id']}")
async def notify_admin(data: dict):
print(f"New user registered: {data['username']}")
event_bus.subscribe("user.created", send_welcome_email)
event_bus.subscribe("user.created", create_default_settings)
event_bus.subscribe("user.created", notify_admin)
app = FastAPI()
@app.post("/users")
async def create_user(user: dict):
new_user = {"id": 1, **user}
# Publish event - all handlers run concurrently
await event_bus.publish("user.created", {
"user_id": new_user["id"],
"username": new_user.get("username"),
"email": new_user.get("email")
})
return new_user
# Production: Event-driven with Redis Streams
import aioredis
import json
class RedisEventPublisher:
def __init__(self, redis_url: str):
self.redis = None
self.redis_url = redis_url
async def connect(self):
self.redis = aioredis.from_url(self.redis_url)
async def publish(self, channel: str, event: dict):
await self.redis.xadd(
channel,
{"data": json.dumps(event)}
)
publisher = RedisEventPublisher("redis://localhost")
@app.post("/orders")
async def create_order(order: dict):
new_order = {"id": 1, "total": order.get("total", 0), **order}
await publisher.publish("orders", {
"event": "order.created",
"order_id": new_order["id"],
"total": new_order["total"]
})
return new_order
Best practice: Start with a simple in-process event bus for monoliths. Move to Redis Streams or Kafka when you need cross-service communication or guaranteed delivery.
# Install Alembic pip install alembic # Initialize Alembic in your project alembic init alembic
# alembic/env.py (key configuration)
from app.database import Base
from app.models import user, product, order # Import all models
from app.config import get_settings
settings = get_settings()
config.set_main_option("sqlalchemy.url", settings.database_url)
target_metadata = Base.metadata
# Create a migration alembic revision --autogenerate -m "add users table" # Apply migrations alembic upgrade head # Rollback one step alembic downgrade -1 # View migration history alembic history --verbose
# Example migration file
# add users table
# Revision ID: a1b2c3d4e5f6
from alembic import op
import sqlalchemy as sa
def upgrade():
op.create_table(
"users",
sa.Column("id", sa.Integer(), primary_key=True),
sa.Column("username", sa.String(50), unique=True, nullable=False),
sa.Column("email", sa.String(120), unique=True, nullable=False),
sa.Column("hashed_password", sa.String(255), nullable=False),
sa.Column("created_at", sa.DateTime(), server_default=sa.func.now()),
)
op.create_index("ix_users_email", "users", ["email"])
def downgrade():
op.drop_index("ix_users_email", table_name="users")
op.drop_table("users")
Production migration strategy:
import aioredis
import json
import hashlib
from fastapi import FastAPI, Request
from functools import wraps
app = FastAPI()
redis = None
async def startup():
global redis
redis = aioredis.from_url("redis://localhost", decode_responses=True)
# Strategy 1: Simple key-value caching
async def get_cached_or_fetch(key: str, fetch_func, ttl: int = 300):
cached = await redis.get(key)
if cached:
return json.loads(cached)
data = await fetch_func()
await redis.setex(key, ttl, json.dumps(data))
return data
@app.get("/products/{product_id}")
async def get_product(product_id: int):
async def fetch():
return await db_get_product(product_id)
return await get_cached_or_fetch(f"product:{product_id}", fetch, ttl=600)
# Strategy 2: Cache decorator
def cached(prefix: str, ttl: int = 300):
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
# Build cache key from function args
key_data = f"{prefix}:{args}:{kwargs}"
cache_key = hashlib.md5(key_data.encode()).hexdigest()
cached_result = await redis.get(cache_key)
if cached_result:
return json.loads(cached_result)
result = await func(*args, **kwargs)
await redis.setex(cache_key, ttl, json.dumps(result))
return result
return wrapper
return decorator
# Strategy 3: Cache invalidation on write
@app.post("/products")
async def create_product(product: dict):
new_product = {"id": 1, **product}
# Invalidate list cache
await redis.delete("products:list")
# Cache the new product
await redis.setex(
f"product:{new_product['id']}",
600,
json.dumps(new_product)
)
return new_product
@app.put("/products/{product_id}")
async def update_product(product_id: int, updates: dict):
updated = {"id": product_id, **updates}
# Invalidate specific cache and list cache
await redis.delete(f"product:{product_id}")
await redis.delete("products:list")
return updated
# Strategy 4: HTTP cache headers
from fastapi.responses import JSONResponse
@app.get("/static-config")
async def get_config():
data = {"version": "1.0", "features": ["a", "b"]}
response = JSONResponse(content=data)
response.headers["Cache-Control"] = "public, max-age=3600"
response.headers["ETag"] = hashlib.md5(json.dumps(data).encode()).hexdigest()
return response
Caching strategies summary:
| Strategy | Use Case | TTL Guidance |
|---|---|---|
| Cache-aside (lazy load) | General-purpose; read-heavy data | 5-60 minutes |
| Write-through | Data that must be immediately consistent | Match read cache TTL |
| Cache invalidation | Data modified via your own API | Infinite (invalidate on change) |
| HTTP caching headers | Static or slowly changing responses | Based on data volatility |
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from starlette.middleware.httpsredirect import HTTPSRedirectMiddleware
import uuid
import re
app = FastAPI(docs_url=None, redoc_url=None) # Disable docs in production
# 1. CORS - restrict allowed origins
app.add_middleware(
CORSMiddleware,
allow_origins=["https://yourfrontend.com"],
allow_credentials=True,
allow_methods=["GET", "POST", "PUT", "DELETE"],
allow_headers=["*"],
)
# 2. Trusted hosts - prevent host header attacks
app.add_middleware(TrustedHostMiddleware, allowed_hosts=["api.example.com"])
# 3. HTTPS redirect
app.add_middleware(HTTPSRedirectMiddleware)
# 4. Security headers middleware
@app.middleware("http")
async def add_security_headers(request: Request, call_next):
response = await call_next(request)
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["X-XSS-Protection"] = "1; mode=block"
response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
response.headers["Content-Security-Policy"] = "default-src 'self'"
return response
# 5. Request ID tracking
@app.middleware("http")
async def add_request_id(request: Request, call_next):
request_id = str(uuid.uuid4())
request.state.request_id = request_id
response = await call_next(request)
response.headers["X-Request-ID"] = request_id
return response
# 6. Input sanitization in Pydantic models
from pydantic import BaseModel, field_validator
class UserInput(BaseModel):
name: str
comment: str
@field_validator("comment")
@classmethod
def sanitize_comment(cls, v):
# Remove potential script tags
cleaned = re.sub(r"<script.*?>.*?</script>", "", v, flags=re.DOTALL | re.IGNORECASE)
return cleaned.strip()
Production security checklist:
/docs, /redoc)* in production)# .github/workflows/ci-cd.yml
name: CI/CD Pipeline
on:
push:
branches: [main]
pull_request:
branches: [main]
jobs:
test:
runs-on: ubuntu-latest
services:
postgres:
image: postgres:15
env:
POSTGRES_USER: test
POSTGRES_PASSWORD: test
POSTGRES_DB: testdb
ports:
- 5432:5432
redis:
image: redis:7
ports:
- 6379:6379
steps:
- uses: actions/checkout@v4
- uses: actions/setup-python@v5
with:
python-version: "3.11"
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install pytest pytest-cov pytest-asyncio httpx
- name: Run linting
run: |
pip install ruff
ruff check app/
- name: Run type checking
run: |
pip install mypy
mypy app/ --ignore-missing-imports
- name: Run tests with coverage
env:
DATABASE_URL: postgresql://test:test@localhost:5432/testdb
REDIS_URL: redis://localhost:6379
run: |
pytest tests/ -v --cov=app --cov-report=xml
- name: Upload coverage
uses: codecov/codecov-action@v3
deploy:
needs: test
runs-on: ubuntu-latest
if: github.ref == 'refs/heads/main'
steps:
- uses: actions/checkout@v4
- name: Build Docker image
run: docker build -t myapi:latest .
- name: Push to registry
run: |
docker tag myapi:latest registry.example.com/myapi:latest
docker push registry.example.com/myapi:latest
- name: Deploy to production
run: |
ssh deploy@production "cd /app && docker-compose pull && docker-compose up -d"
CI/CD best practices for FastAPI:
ruff for fast Python linting and formatting.mypy to catch type errors before runtime.alembic upgrade head as part of the deploy step./health endpoint and verify it after deployment.This guide covered 36 interview questions spanning junior, mid-level, and senior FastAPI topics. Here are the most important themes to remember:
| Level | Key Themes |
|---|---|
| Junior | Understand path/query parameters, Pydantic models, type hints, HTTP methods, error handling, and how to run a FastAPI app with Uvicorn. |
| Mid-Level | Master dependency injection, JWT authentication, async vs sync patterns, CRUD operations, APIRouter organization, testing with TestClient, and file uploads. |
| Senior | Design scalable architectures, understand ASGI, implement WebSockets, optimize for high concurrency, manage database migrations with Alembic, implement caching and rate limiting, deploy with Docker/Nginx, secure the application, and set up CI/CD pipelines. |
Resources for further study: