FastAPI – Interview Questions

Introduction

FastAPI has rapidly become one of the most popular Python web frameworks, and demand for FastAPI developers continues to grow. Whether you are preparing for your first Python backend role or aiming for a senior architect position, this guide covers the questions you are most likely to encounter in a real interview.

The questions are organized into three tiers:

Level Focus Areas What Interviewers Look For
Junior Core concepts, basic routing, Pydantic basics, running the app Solid fundamentals, ability to build simple endpoints
Mid-Level Dependency injection, auth, async patterns, testing, CRUD Production-quality code, understanding of the request lifecycle
Senior Architecture, ASGI, WebSockets, deployment, CI/CD, security System design thinking, performance tuning, operational maturity
Interview Tip: For every question, try to give a concise definition first, then follow up with a practical example or a real-world scenario. Interviewers value clarity and the ability to connect theory to practice.

Junior Level Questions

1. What is FastAPI and why use it?

FastAPI is a modern, high-performance Python web framework for building APIs. It is built on top of Starlette (for the web layer) and Pydantic (for data validation). Key reasons to choose FastAPI include:

  • Performance – Comparable to Node.js and Go thanks to async support and Starlette’s ASGI foundation.
  • Automatic documentation – Swagger UI and ReDoc are generated from your code with zero extra config.
  • Type safety – Python type hints drive validation, serialization, and editor auto-complete.
  • Developer productivity – Less boilerplate means faster development cycles.
from fastapi import FastAPI

app = FastAPI()

@app.get("/")
def read_root():
    return {"message": "Hello, World!"}

Best practice: Choose FastAPI when you need an async-capable REST or GraphQL API with automatic request validation. If you only need to serve HTML templates with minimal API work, a lighter framework may suffice.

2. How do path parameters work?

Path parameters are dynamic segments in the URL path. You declare them inside curly braces in the route decorator and as function arguments with type annotations. FastAPI automatically validates and converts the value to the declared type.

from fastapi import FastAPI

app = FastAPI()

@app.get("/users/{user_id}")
def get_user(user_id: int):
    return {"user_id": user_id}

# GET /users/42   -> {"user_id": 42}
# GET /users/abc  -> 422 Unprocessable Entity (validation error)

You can also use Path() for additional constraints:

from fastapi import FastAPI, Path

app = FastAPI()

@app.get("/items/{item_id}")
def get_item(item_id: int = Path(..., title="Item ID", ge=1, le=10000)):
    return {"item_id": item_id}

Common pitfall: If you have both /users/me and /users/{user_id}, put the static route first. FastAPI matches routes in declaration order, so /users/{user_id} would capture "me" as a path parameter if declared first.

3. What are query parameters?

Query parameters are key-value pairs appended to the URL after a ?. Any function parameter that is not part of the path is automatically treated as a query parameter.

from fastapi import FastAPI
from typing import Optional

app = FastAPI()

@app.get("/items")
def list_items(skip: int = 0, limit: int = 10, q: Optional[str] = None):
    result = {"skip": skip, "limit": limit}
    if q:
        result["query"] = q
    return result

# GET /items?skip=5&limit=20&q=phone
# -> {"skip": 5, "limit": 20, "query": "phone"}

Use Query() for extra validation:

from fastapi import FastAPI, Query

app = FastAPI()

@app.get("/search")
def search(q: str = Query(..., min_length=2, max_length=100)):
    return {"query": q}

Best practice: Always set sensible defaults for pagination parameters (skip, limit) and cap the maximum limit to prevent clients from requesting excessively large result sets.

4. How does FastAPI use Python type hints?

Type hints are central to FastAPI. They serve multiple purposes simultaneously:

Purpose How Type Hints Help
Request validation FastAPI validates incoming data against declared types automatically
Serialization Response data is serialized based on the return type or response_model
Documentation Swagger UI reflects parameter types, descriptions, and constraints
Editor support IDEs provide auto-complete, type checking, and refactoring tools
from fastapi import FastAPI
from pydantic import BaseModel
from typing import Optional, List

app = FastAPI()

class Item(BaseModel):
    name: str
    price: float
    tags: List[str] = []
    description: Optional[str] = None

@app.post("/items", response_model=Item)
def create_item(item: Item):
    # item is already validated and typed
    return item

Key insight: Unlike Flask, where you manually call request.get_json() and validate fields yourself, FastAPI uses type hints to handle all of this declaratively. This eliminates an entire class of bugs.

5. What is Pydantic and how does FastAPI use it?

Pydantic is a data validation and settings management library that uses Python type annotations. FastAPI relies on Pydantic for:

  • Parsing and validating request bodies
  • Serializing response data
  • Defining configuration via BaseSettings
  • Generating JSON Schema (used for OpenAPI docs)
from pydantic import BaseModel, Field, field_validator
from typing import Optional
from datetime import datetime

class UserCreate(BaseModel):
    username: str = Field(..., min_length=3, max_length=50)
    email: str
    age: int = Field(..., ge=13, le=120)
    bio: Optional[str] = None

    @field_validator("email")
    @classmethod
    def validate_email(cls, v):
        if "@" not in v:
            raise ValueError("Invalid email address")
        return v.lower()

class UserResponse(BaseModel):
    id: int
    username: str
    email: str
    created_at: datetime

    model_config = {"from_attributes": True}

The model_config = {"from_attributes": True} setting (formerly class Config: orm_mode = True in Pydantic v1) allows Pydantic to read data from ORM objects like SQLAlchemy models.

Common pitfall: Forgetting to enable from_attributes when returning ORM objects will cause serialization errors.

6. How do you run a FastAPI application?

FastAPI applications are ASGI apps, so you need an ASGI server. The most common choice is Uvicorn.

# Install
pip install fastapi uvicorn

# Run in development with auto-reload
uvicorn main:app --reload --host 0.0.0.0 --port 8000

# Run in production with multiple workers
uvicorn main:app --host 0.0.0.0 --port 8000 --workers 4

You can also run it programmatically:

import uvicorn

if __name__ == "__main__":
    uvicorn.run("main:app", host="0.0.0.0", port=8000, reload=True)

For production deployments, Gunicorn with Uvicorn workers is the recommended pattern:

gunicorn main:app -w 4 -k uvicorn.workers.UvicornWorker --bind 0.0.0.0:8000

Best practice: Use --reload only in development. In production, use Gunicorn as the process manager with Uvicorn workers for robustness.

7. What is the difference between @app.get and @app.post?

These decorators bind a function to an HTTP method. Each method has a specific semantic meaning:

Decorator HTTP Method Purpose Request Body Idempotent
@app.get GET Retrieve data No Yes
@app.post POST Create a resource Yes No
@app.put PUT Replace a resource Yes Yes
@app.patch PATCH Partially update Yes No
@app.delete DELETE Delete a resource Optional Yes
from fastapi import FastAPI
from pydantic import BaseModel

app = FastAPI()

class Item(BaseModel):
    name: str
    price: float

items_db = {}

@app.get("/items/{item_id}")
def get_item(item_id: int):
    return items_db.get(item_id, {"error": "Not found"})

@app.post("/items", status_code=201)
def create_item(item: Item):
    item_id = len(items_db) + 1
    items_db[item_id] = item.model_dump()
    return {"id": item_id, **item.model_dump()}

@app.put("/items/{item_id}")
def replace_item(item_id: int, item: Item):
    items_db[item_id] = item.model_dump()
    return {"id": item_id, **item.model_dump()}

@app.delete("/items/{item_id}", status_code=204)
def delete_item(item_id: int):
    items_db.pop(item_id, None)

8. How do you access Swagger docs in FastAPI?

FastAPI automatically generates interactive API documentation from your route definitions, type hints, and Pydantic models. Two UIs are available out of the box:

URL UI Description
/docs Swagger UI Interactive documentation with a “Try it out” feature
/redoc ReDoc Clean, read-only documentation
/openapi.json Raw JSON The OpenAPI schema as JSON
from fastapi import FastAPI

# Customize docs metadata
app = FastAPI(
    title="My API",
    description="A comprehensive API for managing items",
    version="1.0.0",
    docs_url="/docs",         # default
    redoc_url="/redoc",       # default
    openapi_url="/openapi.json"  # default
)

# Disable docs in production
app_prod = FastAPI(docs_url=None, redoc_url=None)

Best practice: Disable interactive docs in production for security. You can conditionally enable them based on an environment variable.

9. What is Uvicorn?

Uvicorn is a lightning-fast ASGI server implementation. It serves as the bridge between the network and your FastAPI application.

  • Built on uvloop (a fast, drop-in replacement for asyncio’s event loop) and httptools (a fast HTTP parser).
  • Supports HTTP/1.1 and WebSockets.
  • Provides hot-reload for development.
# Standard install
pip install uvicorn[standard]

# Run with SSL
uvicorn main:app --ssl-keyfile=key.pem --ssl-certfile=cert.pem

# Run with specific log level
uvicorn main:app --log-level warning

Key insight: Uvicorn itself is single-process. For production, pair it with Gunicorn (gunicorn -k uvicorn.workers.UvicornWorker) to get multi-process concurrency.

10. How do you handle errors in FastAPI?

FastAPI provides several mechanisms for error handling:

from fastapi import FastAPI, HTTPException, Request
from fastapi.responses import JSONResponse
from fastapi.exceptions import RequestValidationError

app = FastAPI()

items_db = {1: {"name": "Laptop"}}

# 1. HTTPException for known errors
@app.get("/items/{item_id}")
def get_item(item_id: int):
    if item_id not in items_db:
        raise HTTPException(
            status_code=404,
            detail="Item not found",
            headers={"X-Error": "Item lookup failed"}
        )
    return items_db[item_id]

# 2. Custom exception class
class ItemNotFoundError(Exception):
    def __init__(self, item_id: int):
        self.item_id = item_id

@app.exception_handler(ItemNotFoundError)
async def item_not_found_handler(request: Request, exc: ItemNotFoundError):
    return JSONResponse(
        status_code=404,
        content={"detail": f"Item {exc.item_id} does not exist"}
    )

# 3. Override default validation error handler
@app.exception_handler(RequestValidationError)
async def validation_exception_handler(request: Request, exc: RequestValidationError):
    return JSONResponse(
        status_code=422,
        content={
            "detail": "Validation failed",
            "errors": exc.errors()
        }
    )

Best practice: Use HTTPException for simple cases. Create custom exception classes and handlers for domain-specific errors to keep your route functions clean.

11. What is the difference between FastAPI and Flask?

Feature FastAPI Flask
Type ASGI (async-native) WSGI (sync by default)
Validation Built-in via Pydantic Manual or via extensions (Marshmallow)
Documentation Auto-generated Swagger & ReDoc Manual or via Flask-RESTX
Performance Very high (on par with Go/Node) Moderate
Dependency injection Built-in Depends() system Not built-in
Async support Native async/await Limited (added in Flask 2.0)
Ecosystem maturity Growing rapidly Very mature, huge plugin ecosystem
Learning curve Moderate (need to understand type hints) Low (simple and minimal)

When to choose FastAPI: New API-first projects that need high performance, automatic validation, and auto-generated docs.

When to choose Flask: Projects that need extensive HTML template rendering, or when your team has deep Flask experience and a large existing Flask codebase.

12. How do you return different status codes?

from fastapi import FastAPI, Response, HTTPException
from fastapi.responses import JSONResponse

app = FastAPI()

# Method 1: Set default status code in decorator
@app.post("/items", status_code=201)
def create_item(item: dict):
    return {"id": 1, **item}

# Method 2: Use Response parameter for dynamic codes
@app.get("/items/{item_id}")
def get_item(item_id: int, response: Response):
    if item_id == 0:
        response.status_code = 204
        return None
    return {"item_id": item_id}

# Method 3: Return a Response object directly
@app.get("/health")
def health_check():
    healthy = True
    if healthy:
        return JSONResponse(content={"status": "ok"}, status_code=200)
    return JSONResponse(content={"status": "degraded"}, status_code=503)

# Method 4: HTTPException for error codes
@app.get("/secure")
def secure_endpoint():
    raise HTTPException(status_code=403, detail="Forbidden")

Best practice: Use the status_code parameter in the decorator for the “happy path” response. Use HTTPException for error paths. This keeps your OpenAPI docs accurate.

Mid-Level Questions

13. How does dependency injection work in FastAPI?

FastAPI has a powerful built-in dependency injection system using Depends(). Dependencies are functions (or classes) that are called before your route handler, and their return values are injected as parameters.

from fastapi import FastAPI, Depends, Query
from typing import Optional

app = FastAPI()

# Simple function dependency
def common_parameters(
    skip: int = Query(0, ge=0),
    limit: int = Query(10, ge=1, le=100),
    q: Optional[str] = None
):
    return {"skip": skip, "limit": limit, "q": q}

@app.get("/items")
def list_items(params: dict = Depends(common_parameters)):
    return {"params": params}

@app.get("/users")
def list_users(params: dict = Depends(common_parameters)):
    return {"params": params}

# Class-based dependency
class Pagination:
    def __init__(self, skip: int = 0, limit: int = 10):
        self.skip = skip
        self.limit = limit

@app.get("/products")
def list_products(pagination: Pagination = Depends()):
    return {"skip": pagination.skip, "limit": pagination.limit}

# Nested dependencies
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

def get_current_user(db=Depends(get_db)):
    # Uses db dependency
    user = db.query(User).first()
    return user

@app.get("/profile")
def get_profile(user=Depends(get_current_user)):
    return {"username": user.username}

Key insight: Dependencies that use yield act like context managers. Code after yield runs after the response is sent, making them perfect for cleanup tasks like closing database connections.

14. Explain the request validation lifecycle

When a request arrives at a FastAPI endpoint, it goes through a well-defined validation pipeline:

  1. Path parameter parsing – Extract values from the URL path and cast to declared types.
  2. Query parameter parsing – Extract values from the query string with defaults applied.
  3. Header extraction – Parse declared header parameters.
  4. Cookie extraction – Parse declared cookie parameters.
  5. Body parsing – Read and parse the request body (JSON, form data, or file upload).
  6. Pydantic validation – Validate all parsed data against Pydantic models and field constraints.
  7. Dependency resolution – Execute dependency functions in topological order.
  8. Route handler execution – Call the endpoint function with validated, typed parameters.
  9. Response serialization – Serialize the return value using response_model if specified.
  10. Dependency cleanup – Run teardown code for yield dependencies.
from fastapi import FastAPI, Depends, Header, Path, Query, HTTPException
from pydantic import BaseModel

app = FastAPI()

class ItemCreate(BaseModel):
    name: str
    price: float

class ItemResponse(BaseModel):
    id: int
    name: str
    price: float

def verify_token(x_token: str = Header(...)):
    if x_token != "secret-token":
        raise HTTPException(status_code=403, detail="Invalid token")
    return x_token

@app.post(
    "/categories/{category_id}/items",
    response_model=ItemResponse,
    status_code=201
)
def create_item(
    category_id: int = Path(..., ge=1),        # Step 1: path
    q: str = Query(None),                       # Step 2: query
    token: str = Depends(verify_token),         # Step 7: dependency
    item: ItemCreate = ...,                     # Steps 5-6: body + validation
):
    # Step 8: handler executes with all validated data
    return ItemResponse(id=1, name=item.name, price=item.price)
    # Step 9: response serialized via response_model

If validation fails at any step, FastAPI returns a 422 Unprocessable Entity response with detailed error information.

15. How do you implement authentication with JWT?

from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel
from datetime import datetime, timedelta
from typing import Optional

# Configuration
SECRET_KEY = "your-secret-key-keep-it-secret"
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

app = FastAPI()
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

# Models
class Token(BaseModel):
    access_token: str
    token_type: str

class TokenData(BaseModel):
    username: Optional[str] = None

class User(BaseModel):
    username: str
    email: str
    disabled: bool = False

# Helper functions
def verify_password(plain_password: str, hashed_password: str) -> bool:
    return pwd_context.verify(plain_password, hashed_password)

def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
    to_encode = data.copy()
    expire = datetime.utcnow() + (expires_delta or timedelta(minutes=15))
    to_encode.update({"exp": expire})
    return jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)

async def get_current_user(token: str = Depends(oauth2_scheme)):
    credentials_exception = HTTPException(
        status_code=status.HTTP_401_UNAUTHORIZED,
        detail="Could not validate credentials",
        headers={"WWW-Authenticate": "Bearer"},
    )
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        username: str = payload.get("sub")
        if username is None:
            raise credentials_exception
    except JWTError:
        raise credentials_exception
    user = get_user_from_db(username)
    if user is None:
        raise credentials_exception
    return user

# Endpoints
@app.post("/token", response_model=Token)
async def login(form_data: OAuth2PasswordRequestForm = Depends()):
    user = authenticate_user(form_data.username, form_data.password)
    if not user:
        raise HTTPException(
            status_code=status.HTTP_401_UNAUTHORIZED,
            detail="Incorrect username or password",
        )
    access_token = create_access_token(
        data={"sub": user.username},
        expires_delta=timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    )
    return {"access_token": access_token, "token_type": "bearer"}

@app.get("/users/me", response_model=User)
async def read_users_me(current_user: User = Depends(get_current_user)):
    return current_user
Interview Tip: Be ready to discuss token refresh strategies, token revocation (blocklists), and the difference between stateless JWT auth and session-based auth.

16. What is OAuth2PasswordBearer?

OAuth2PasswordBearer is a FastAPI security utility class that implements the OAuth2 Password flow. It does two things:

  1. Extracts the token – Reads the Authorization: Bearer <token> header from incoming requests.
  2. Documents the flow – Adds the OAuth2 password flow to the OpenAPI schema, enabling the “Authorize” button in Swagger UI.
from fastapi import FastAPI, Depends
from fastapi.security import OAuth2PasswordBearer

app = FastAPI()

# tokenUrl is the endpoint where clients POST credentials to get a token
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

# Using it as a dependency simply extracts the token string
@app.get("/protected")
async def protected_route(token: str = Depends(oauth2_scheme)):
    # token is the raw Bearer token string
    # You still need to decode/validate it yourself
    return {"token": token}

OAuth2PasswordBearer does not validate the token. It only extracts it. You must combine it with your own validation logic (e.g., JWT decoding) in a dependency.

17. How do you set up database sessions with dependencies?

from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
from fastapi import FastAPI, Depends, HTTPException

DATABASE_URL = "postgresql://user:password@localhost:5432/mydb"

engine = create_engine(DATABASE_URL, pool_size=10, max_overflow=20)
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()

# Dependency that provides a database session
def get_db():
    db = SessionLocal()
    try:
        yield db
    finally:
        db.close()

app = FastAPI()

@app.get("/users/{user_id}")
def get_user(user_id: int, db: Session = Depends(get_db)):
    user = db.query(User).filter(User.id == user_id).first()
    if not user:
        raise HTTPException(status_code=404, detail="User not found")
    return user

@app.post("/users", status_code=201)
def create_user(user_data: UserCreate, db: Session = Depends(get_db)):
    db_user = User(**user_data.model_dump())
    db.add(db_user)
    db.commit()
    db.refresh(db_user)
    return db_user

Key insight: The yield pattern ensures the session is always closed, even if an exception occurs during request processing. This prevents connection leaks.

18. How does FastAPI handle async/await?

FastAPI natively supports Python’s async/await syntax because it runs on ASGI (Asynchronous Server Gateway Interface).

import httpx
from fastapi import FastAPI, Depends

app = FastAPI()

# Async endpoint - runs on the event loop
@app.get("/async-data")
async def get_async_data():
    async with httpx.AsyncClient() as client:
        response = await client.get("https://api.example.com/data")
    return response.json()

# Sync endpoint - runs in a thread pool
@app.get("/sync-data")
def get_sync_data():
    # FastAPI automatically runs this in a thread pool
    # so it does not block the event loop
    import time
    time.sleep(1)  # Simulates blocking I/O
    return {"data": "result"}

# Async dependency
async def get_async_client():
    async with httpx.AsyncClient() as client:
        yield client

@app.get("/external")
async def call_external(client: httpx.AsyncClient = Depends(get_async_client)):
    response = await client.get("https://api.example.com/resource")
    return response.json()

Important rule: If your function uses await, declare it with async def. If it performs blocking I/O (database calls via synchronous drivers, file I/O), use regular def and let FastAPI handle the threading.

19. What is the difference between sync and async endpoints?

Aspect async def endpoint def endpoint (sync)
Execution Runs directly on the async event loop Runs in a separate thread from a thread pool
Blocking I/O Must use async libraries (httpx, aiofiles, asyncpg) Can safely use blocking libraries (requests, open())
Concurrency Thousands of concurrent tasks via event loop Limited by thread pool size (default: 40 threads)
CPU-bound work Blocks the event loop – avoid Blocks one thread – slightly better
import httpx

# WRONG: blocking call in async function blocks the event loop
@app.get("/bad")
async def bad_endpoint():
    import requests  # blocking library!
    response = requests.get("https://api.example.com")  # blocks event loop
    return response.json()

# CORRECT: use async library in async function
@app.get("/good-async")
async def good_async_endpoint():
    async with httpx.AsyncClient() as client:
        response = await client.get("https://api.example.com")
    return response.json()

# CORRECT: use sync function for blocking calls
@app.get("/good-sync")
def good_sync_endpoint():
    import requests
    response = requests.get("https://api.example.com")
    return response.json()
Interview Tip: A common mistake candidates make is declaring endpoints as async def and then using blocking libraries like requests or synchronous database drivers. This blocks the entire event loop and kills performance.

20. How do you implement CRUD operations?

from fastapi import FastAPI, Depends, HTTPException
from sqlalchemy.orm import Session
from pydantic import BaseModel
from typing import List, Optional

app = FastAPI()

# --- Pydantic schemas ---
class ProductCreate(BaseModel):
    name: str
    description: Optional[str] = None
    price: float
    category: str

class ProductUpdate(BaseModel):
    name: Optional[str] = None
    description: Optional[str] = None
    price: Optional[float] = None
    category: Optional[str] = None

class ProductResponse(BaseModel):
    id: int
    name: str
    description: Optional[str]
    price: float
    category: str

    model_config = {"from_attributes": True}

# --- CRUD functions (service layer) ---
def create_product(db: Session, product: ProductCreate):
    db_product = Product(**product.model_dump())
    db.add(db_product)
    db.commit()
    db.refresh(db_product)
    return db_product

def get_products(db: Session, skip: int = 0, limit: int = 100):
    return db.query(Product).offset(skip).limit(limit).all()

def get_product(db: Session, product_id: int):
    return db.query(Product).filter(Product.id == product_id).first()

def update_product(db: Session, product_id: int, updates: ProductUpdate):
    db_product = db.query(Product).filter(Product.id == product_id).first()
    if not db_product:
        return None
    update_data = updates.model_dump(exclude_unset=True)
    for field, value in update_data.items():
        setattr(db_product, field, value)
    db.commit()
    db.refresh(db_product)
    return db_product

def delete_product(db: Session, product_id: int):
    db_product = db.query(Product).filter(Product.id == product_id).first()
    if not db_product:
        return False
    db.delete(db_product)
    db.commit()
    return True

# --- Route handlers ---
@app.post("/products", response_model=ProductResponse, status_code=201)
def create(product: ProductCreate, db: Session = Depends(get_db)):
    return create_product(db, product)

@app.get("/products", response_model=List[ProductResponse])
def read_all(skip: int = 0, limit: int = 100, db: Session = Depends(get_db)):
    return get_products(db, skip, limit)

@app.get("/products/{product_id}", response_model=ProductResponse)
def read_one(product_id: int, db: Session = Depends(get_db)):
    product = get_product(db, product_id)
    if not product:
        raise HTTPException(status_code=404, detail="Product not found")
    return product

@app.patch("/products/{product_id}", response_model=ProductResponse)
def update(product_id: int, updates: ProductUpdate, db: Session = Depends(get_db)):
    product = update_product(db, product_id, updates)
    if not product:
        raise HTTPException(status_code=404, detail="Product not found")
    return product

@app.delete("/products/{product_id}", status_code=204)
def delete(product_id: int, db: Session = Depends(get_db)):
    if not delete_product(db, product_id):
        raise HTTPException(status_code=404, detail="Product not found")

Best practice: Separate CRUD logic into a service layer (separate module) rather than putting database queries directly in route handlers. This makes the code testable and reusable.

21. How do you use APIRouter for organizing routes?

# app/routers/users.py
from fastapi import APIRouter, Depends
from sqlalchemy.orm import Session
from typing import List

router = APIRouter(
    prefix="/users",
    tags=["users"],
    responses={404: {"description": "Not found"}},
)

@router.get("/", response_model=List[UserResponse])
def list_users(db: Session = Depends(get_db)):
    return db.query(User).all()

@router.get("/{user_id}", response_model=UserResponse)
def get_user(user_id: int, db: Session = Depends(get_db)):
    return db.query(User).filter(User.id == user_id).first()

@router.post("/", response_model=UserResponse, status_code=201)
def create_user(user: UserCreate, db: Session = Depends(get_db)):
    db_user = User(**user.model_dump())
    db.add(db_user)
    db.commit()
    db.refresh(db_user)
    return db_user
# app/routers/products.py
from fastapi import APIRouter

router = APIRouter(prefix="/products", tags=["products"])

@router.get("/")
def list_products():
    return []
# app/main.py
from fastapi import FastAPI
from app.routers import users, products

app = FastAPI(title="My API")

app.include_router(users.router)
app.include_router(products.router)

# You can also add a prefix when including
# app.include_router(users.router, prefix="/api/v1")

A well-organized project structure looks like this:

app/
  __init__.py
  main.py              # FastAPI app instance and router includes
  config.py            # Settings and configuration
  database.py          # Database engine and session
  models/              # SQLAlchemy models
    __init__.py
    user.py
    product.py
  schemas/             # Pydantic schemas
    __init__.py
    user.py
    product.py
  routers/             # Route handlers
    __init__.py
    users.py
    products.py
  services/            # Business logic
    __init__.py
    user_service.py
    product_service.py
  dependencies/        # Shared dependencies
    __init__.py
    auth.py
    database.py

22. What are background tasks and when to use them?

Background tasks let you run code after the response has been sent to the client. They are useful for operations that the client does not need to wait for.

from fastapi import FastAPI, BackgroundTasks
from pydantic import BaseModel

app = FastAPI()

def send_email(email: str, subject: str, body: str):
    # Simulate sending email (runs after response is sent)
    import time
    time.sleep(3)
    print(f"Email sent to {email}: {subject}")

def write_audit_log(user_id: int, action: str):
    # Write to audit log after response
    with open("audit.log", "a") as f:
        f.write(f"{user_id}: {action}\n")

class UserCreate(BaseModel):
    username: str
    email: str

@app.post("/users", status_code=201)
def create_user(user: UserCreate, background_tasks: BackgroundTasks):
    # Create user in database (synchronous, client waits)
    new_user = {"id": 1, "username": user.username, "email": user.email}

    # These run AFTER the response is sent
    background_tasks.add_task(send_email, user.email, "Welcome!", "Thanks for joining")
    background_tasks.add_task(write_audit_log, 1, "user_created")

    return new_user

When to use background tasks vs. a task queue (Celery/Redis):

Criteria BackgroundTasks Celery / Task Queue
Duration Short (seconds) Long (minutes/hours)
Reliability Lost if server crashes Persisted in broker, retryable
Infrastructure None extra Needs Redis/RabbitMQ
Use case Emails, logging, cache invalidation Video processing, reports, ETL

23. How do you test FastAPI applications?

from fastapi.testclient import TestClient
from fastapi import FastAPI, Depends
import pytest

app = FastAPI()

@app.get("/")
def read_root():
    return {"message": "Hello"}

@app.get("/items/{item_id}")
def read_item(item_id: int):
    return {"item_id": item_id}

# --- Basic tests ---
client = TestClient(app)

def test_read_root():
    response = client.get("/")
    assert response.status_code == 200
    assert response.json() == {"message": "Hello"}

def test_read_item():
    response = client.get("/items/42")
    assert response.status_code == 200
    assert response.json() == {"item_id": 42}

def test_invalid_item_id():
    response = client.get("/items/not-a-number")
    assert response.status_code == 422

# --- Testing with dependency overrides ---
def get_db():
    return real_database_session()

@app.get("/users")
def get_users(db=Depends(get_db)):
    return []

def override_get_db():
    return test_database_session()

app.dependency_overrides[get_db] = override_get_db

# --- Async testing with httpx ---
import httpx

@pytest.mark.anyio
async def test_async_root():
    async with httpx.AsyncClient(
        transport=httpx.ASGITransport(app=app),
        base_url="http://test"
    ) as ac:
        response = await ac.get("/")
    assert response.status_code == 200

# --- Testing with pytest fixtures ---
@pytest.fixture
def test_client():
    with TestClient(app) as c:
        yield c

def test_with_fixture(test_client):
    response = test_client.get("/")
    assert response.status_code == 200

Best practice: Use dependency_overrides to replace real databases, external APIs, and authentication with test doubles. This makes your tests fast and deterministic.

24. How do you handle file uploads?

from fastapi import FastAPI, UploadFile, File, HTTPException, Form
from typing import List
import os

app = FastAPI()

UPLOAD_DIR = "uploads"
ALLOWED_TYPES = {"image/jpeg", "image/png", "image/gif", "application/pdf"}
MAX_SIZE = 10 * 1024 * 1024  # 10 MB

# Single file upload
@app.post("/upload")
async def upload_file(file: UploadFile = File(...)):
    if file.content_type not in ALLOWED_TYPES:
        raise HTTPException(status_code=400, detail="File type not allowed")

    # Check file size
    contents = await file.read()
    if len(contents) > MAX_SIZE:
        raise HTTPException(status_code=400, detail="File too large")

    file_path = os.path.join(UPLOAD_DIR, file.filename)
    with open(file_path, "wb") as f:
        f.write(contents)

    return {"filename": file.filename, "size": len(contents)}

# Multiple file upload
@app.post("/upload-multiple")
async def upload_multiple(files: List[UploadFile] = File(...)):
    results = []
    for file in files:
        contents = await file.read()
        file_path = os.path.join(UPLOAD_DIR, file.filename)
        with open(file_path, "wb") as f:
            f.write(contents)
        results.append({"filename": file.filename, "size": len(contents)})
    return results

# File upload with additional form data
@app.post("/upload-with-metadata")
async def upload_with_metadata(
    file: UploadFile = File(...),
    description: str = Form(...),
    category: str = Form("general")
):
    contents = await file.read()
    return {
        "filename": file.filename,
        "description": description,
        "category": category,
        "size": len(contents)
    }

Common pitfall: Calling await file.read() loads the entire file into memory. For large files, use chunked reading:

@app.post("/upload-large")
async def upload_large_file(file: UploadFile = File(...)):
    file_path = os.path.join(UPLOAD_DIR, file.filename)
    with open(file_path, "wb") as f:
        while chunk := await file.read(1024 * 1024):  # 1MB chunks
            f.write(chunk)
    return {"filename": file.filename}

Senior Level Questions

25. How do you design a scalable FastAPI architecture?

A scalable FastAPI architecture addresses code organization, deployment topology, and operational concerns. Here is a proven pattern:

# Project structure for a scalable FastAPI application
project/
  app/
    __init__.py
    main.py                 # App factory, middleware, router includes
    config.py               # Pydantic BaseSettings for env-based config
    database.py             # Engine, session factory, base model
    middleware/
      __init__.py
      logging.py            # Request/response logging
      cors.py               # CORS configuration
      rate_limit.py         # Rate limiting middleware
    api/
      __init__.py
      v1/
        __init__.py
        router.py           # Aggregates all v1 routers
        endpoints/
          users.py
          products.py
          orders.py
      v2/
        __init__.py
        router.py
    models/                 # SQLAlchemy ORM models
    schemas/                # Pydantic request/response schemas
    services/               # Business logic layer
    repositories/           # Data access layer
    dependencies/           # Shared Depends() functions
    events/                 # Startup/shutdown event handlers
    utils/                  # Shared utilities
  alembic/                  # Database migrations
  tests/
    conftest.py
    test_users.py
    test_products.py
  docker-compose.yml
  Dockerfile
  pyproject.toml
# app/config.py - Environment-based configuration
from pydantic_settings import BaseSettings
from functools import lru_cache

class Settings(BaseSettings):
    app_name: str = "My API"
    debug: bool = False
    database_url: str
    redis_url: str = "redis://localhost:6379"
    secret_key: str
    allowed_origins: list[str] = ["http://localhost:3000"]

    model_config = {"env_file": ".env"}

@lru_cache()
def get_settings():
    return Settings()
# app/main.py - App factory pattern
from fastapi import FastAPI
from contextlib import asynccontextmanager
from app.config import get_settings
from app.api.v1.router import router as v1_router
from app.database import engine, Base

@asynccontextmanager
async def lifespan(app: FastAPI):
    # Startup
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
    yield
    # Shutdown
    await engine.dispose()

def create_app() -> FastAPI:
    settings = get_settings()
    app = FastAPI(
        title=settings.app_name,
        lifespan=lifespan,
        docs_url="/docs" if settings.debug else None,
    )
    app.include_router(v1_router, prefix="/api/v1")
    return app

app = create_app()

Key architectural principles:

  • Layered architecture – Routes → Services → Repositories → Database. Each layer has a single responsibility.
  • API versioning – Use URL prefixes (/api/v1, /api/v2) to evolve your API without breaking clients.
  • Configuration via environment – Use Pydantic BaseSettings for type-safe, environment-driven configuration.
  • Horizontal scaling – Stateless request handling enables running multiple instances behind a load balancer.

26. Explain the ASGI specification and how FastAPI uses it

ASGI (Asynchronous Server Gateway Interface) is the spiritual successor to WSGI. It defines a standard interface between async-capable Python web servers and applications.

Feature WSGI ASGI
Concurrency model Synchronous, one request per thread Asynchronous, event-loop based
Protocol support HTTP only HTTP, WebSocket, HTTP/2
Connection lifecycle Request-response only Long-lived connections supported
Frameworks Flask, Django FastAPI, Starlette, Django (3.0+)
Servers Gunicorn, uWSGI Uvicorn, Daphne, Hypercorn

At its core, an ASGI application is a callable with this signature:

# Raw ASGI application example
async def app(scope, receive, send):
    # scope  - dict with connection info (type, path, headers, etc.)
    # receive - async callable to receive messages from client
    # send   - async callable to send messages to client
    if scope["type"] == "http":
        # Read request body
        body = b""
        while True:
            message = await receive()
            body += message.get("body", b"")
            if not message.get("more_body", False):
                break

        # Send response
        await send({
            "type": "http.response.start",
            "status": 200,
            "headers": [(b"content-type", b"application/json")],
        })
        await send({
            "type": "http.response.body",
            "body": b'{"message": "Hello from raw ASGI"}',
        })

FastAPI wraps this low-level protocol behind its elegant decorator-based API. When you write @app.get("/items"), FastAPI (via Starlette) handles all the ASGI message passing for you.

The request flow: Client → Uvicorn (ASGI server) → Starlette (ASGI toolkit) → FastAPI (routing + validation) → Your handler

27. How do you implement WebSocket endpoints?

from fastapi import FastAPI, WebSocket, WebSocketDisconnect
from typing import List

app = FastAPI()

# Connection manager for multiple clients
class ConnectionManager:
    def __init__(self):
        self.active_connections: List[WebSocket] = []

    async def connect(self, websocket: WebSocket):
        await websocket.accept()
        self.active_connections.append(websocket)

    def disconnect(self, websocket: WebSocket):
        self.active_connections.remove(websocket)

    async def send_personal(self, message: str, websocket: WebSocket):
        await websocket.send_text(message)

    async def broadcast(self, message: str):
        for connection in self.active_connections:
            await connection.send_text(message)

manager = ConnectionManager()

@app.websocket("/ws/{client_id}")
async def websocket_endpoint(websocket: WebSocket, client_id: int):
    await manager.connect(websocket)
    try:
        while True:
            data = await websocket.receive_text()
            # Echo back to sender
            await manager.send_personal(f"You said: {data}", websocket)
            # Broadcast to all
            await manager.broadcast(f"Client #{client_id}: {data}")
    except WebSocketDisconnect:
        manager.disconnect(websocket)
        await manager.broadcast(f"Client #{client_id} left the chat")

# WebSocket with JSON messages
@app.websocket("/ws/json")
async def json_websocket(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            data = await websocket.receive_json()
            action = data.get("action")
            if action == "subscribe":
                await websocket.send_json({"status": "subscribed", "channel": data["channel"]})
            elif action == "message":
                await websocket.send_json({"echo": data["content"]})
    except WebSocketDisconnect:
        pass

Best practice: For production WebSocket applications, use Redis Pub/Sub or a message broker to coordinate messages across multiple server instances, since in-memory connection managers only work within a single process.

28. How do you optimize FastAPI for high concurrency?

High-concurrency optimization in FastAPI involves several layers:

# 1. Use async everywhere possible
import asyncpg
from fastapi import FastAPI

app = FastAPI()

# Use async database driver
pool = None

async def startup():
    global pool
    pool = await asyncpg.create_pool(
        "postgresql://user:pass@localhost/db",
        min_size=10,
        max_size=50
    )

@app.get("/users/{user_id}")
async def get_user(user_id: int):
    async with pool.acquire() as conn:
        row = await conn.fetchrow("SELECT * FROM users WHERE id = $1", user_id)
    return dict(row)

# 2. Use connection pooling
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession

engine = create_async_engine(
    "postgresql+asyncpg://user:pass@localhost/db",
    pool_size=20,
    max_overflow=10,
    pool_timeout=30,
    pool_recycle=1800,  # Recycle connections after 30 minutes
)

# 3. Add response caching
from fastapi_cache import FastAPICache
from fastapi_cache.backends.redis import RedisBackend
from fastapi_cache.decorator import cache

@app.get("/expensive-query")
@cache(expire=60)
async def expensive_query():
    # Result is cached in Redis for 60 seconds
    return await run_expensive_computation()

# 4. Use streaming responses for large payloads
from fastapi.responses import StreamingResponse
import json

@app.get("/large-dataset")
async def stream_data():
    async def generate():
        for chunk in fetch_large_dataset_in_chunks():
            yield json.dumps(chunk) + "\n"
    return StreamingResponse(generate(), media_type="application/x-ndjson")
# 5. Scale with multiple workers
gunicorn main:app -w 4 -k uvicorn.workers.UvicornWorker \
    --bind 0.0.0.0:8000 \
    --timeout 120 \
    --keep-alive 5

Optimization checklist:

  • Use async database drivers (asyncpg, aiomysql, motor)
  • Configure connection pool sizes based on expected concurrency
  • Add Redis caching for frequently accessed, rarely changing data
  • Use streaming responses for large payloads
  • Profile with tools like py-spy to find bottlenecks
  • Run multiple Uvicorn workers behind Gunicorn

29. What strategies do you use for database connection pooling?

# SQLAlchemy async connection pooling
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
from sqlalchemy.pool import QueuePool

engine = create_async_engine(
    "postgresql+asyncpg://user:pass@localhost/db",
    poolclass=QueuePool,
    pool_size=20,          # Steady-state connections
    max_overflow=10,       # Extra connections under load (total max: 30)
    pool_timeout=30,       # Seconds to wait for a connection
    pool_recycle=1800,     # Recycle connections every 30 minutes
    pool_pre_ping=True,    # Test connections before using them
    echo=False,            # Set True to log all SQL
)

AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)

async def get_db() -> AsyncSession:
    async with AsyncSessionLocal() as session:
        try:
            yield session
            await session.commit()
        except Exception:
            await session.rollback()
            raise

Sizing guidelines:

Parameter Guideline
pool_size Number of Uvicorn workers x expected concurrent DB queries per worker. Start with 5-10 per worker.
max_overflow 50-100% of pool_size for burst handling
pool_timeout Lower values (10-30s) fail fast; higher values queue more requests
pool_recycle Set below your database’s wait_timeout to avoid stale connections
Interview Tip: Be prepared to discuss the relationship between the number of Uvicorn workers, the pool size per worker, and the database’s max_connections setting. The total connections across all workers must not exceed the database limit.

30. How do you implement rate limiting?

# Method 1: Custom middleware with Redis
from fastapi import FastAPI, Request, HTTPException, Depends
from fastapi.responses import JSONResponse
import aioredis

app = FastAPI()
redis = None

async def startup():
    global redis
    redis = aioredis.from_url("redis://localhost")

@app.middleware("http")
async def rate_limit_middleware(request: Request, call_next):
    client_ip = request.client.host
    key = f"rate_limit:{client_ip}"
    window = 60  # seconds
    max_requests = 100

    current = await redis.get(key)
    if current and int(current) >= max_requests:
        return JSONResponse(
            status_code=429,
            content={"detail": "Too many requests"},
            headers={"Retry-After": str(window)}
        )

    pipe = redis.pipeline()
    pipe.incr(key)
    pipe.expire(key, window)
    await pipe.execute()

    response = await call_next(request)
    return response

# Method 2: Dependency-based rate limiting (per-route)
class RateLimiter:
    def __init__(self, max_requests: int, window_seconds: int):
        self.max_requests = max_requests
        self.window = window_seconds

    async def __call__(self, request: Request):
        client_ip = request.client.host
        key = f"rate:{client_ip}:{request.url.path}"

        current = await redis.get(key)
        if current and int(current) >= self.max_requests:
            raise HTTPException(
                status_code=429,
                detail=f"Rate limit exceeded. Try again in {self.window} seconds."
            )

        pipe = redis.pipeline()
        pipe.incr(key)
        pipe.expire(key, self.window)
        await pipe.execute()

# Apply different limits to different routes
@app.get("/search", dependencies=[Depends(RateLimiter(max_requests=30, window_seconds=60))])
async def search(q: str):
    return {"results": []}

@app.post("/upload", dependencies=[Depends(RateLimiter(max_requests=5, window_seconds=60))])
async def upload():
    return {"status": "ok"}

Best practice: Use dependency-based rate limiting so you can apply different limits to different endpoints. Expensive operations (search, uploads) should have stricter limits than simple reads.

31. How do you deploy FastAPI with Docker and Nginx?

# Dockerfile (multi-stage build)
FROM python:3.11-slim AS builder
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir --prefix=/install -r requirements.txt

FROM python:3.11-slim
WORKDIR /app
COPY --from=builder /install /usr/local
COPY ./app ./app
EXPOSE 8000
CMD ["gunicorn", "app.main:app", "-w", "4", "-k", "uvicorn.workers.UvicornWorker", "--bind", "0.0.0.0:8000"]
# docker-compose.yml
version: "3.8"
services:
  api:
    build: .
    environment:
      - DATABASE_URL=postgresql://user:pass@db:5432/mydb
      - REDIS_URL=redis://redis:6379
    depends_on:
      - db
      - redis
    networks:
      - backend

  nginx:
    image: nginx:alpine
    ports:
      - "80:80"
      - "443:443"
    volumes:
      - ./nginx.conf:/etc/nginx/nginx.conf
      - ./certs:/etc/ssl/certs
    depends_on:
      - api
    networks:
      - backend

  db:
    image: postgres:15
    environment:
      POSTGRES_USER: user
      POSTGRES_PASSWORD: pass
      POSTGRES_DB: mydb
    volumes:
      - pgdata:/var/lib/postgresql/data
    networks:
      - backend

  redis:
    image: redis:7-alpine
    networks:
      - backend

volumes:
  pgdata:

networks:
  backend:
# nginx.conf
events {
    worker_connections 1024;
}

http {
    upstream fastapi {
        server api:8000;
    }

    server {
        listen 80;
        server_name api.example.com;

        location / {
            proxy_pass http://fastapi;
            proxy_set_header Host $host;
            proxy_set_header X-Real-IP $remote_addr;
            proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
            proxy_set_header X-Forwarded-Proto $scheme;
        }

        location /ws {
            proxy_pass http://fastapi;
            proxy_http_version 1.1;
            proxy_set_header Upgrade $http_upgrade;
            proxy_set_header Connection "upgrade";
        }
    }
}

Key considerations:

  • Use multi-stage Docker builds to minimize image size.
  • Set Gunicorn workers to 2 * CPU_CORES + 1.
  • Configure Nginx to proxy WebSocket connections with the Upgrade header.
  • Use Docker health checks to enable automatic container restarts.

32. How do you implement event-driven architecture with FastAPI?

# Event-driven architecture using an in-process event bus
from fastapi import FastAPI
from typing import Callable, Dict, List
import asyncio

# Simple event bus
class EventBus:
    def __init__(self):
        self._handlers: Dict[str, List[Callable]] = {}

    def subscribe(self, event_type: str, handler: Callable):
        if event_type not in self._handlers:
            self._handlers[event_type] = []
        self._handlers[event_type].append(handler)

    async def publish(self, event_type: str, data: dict):
        handlers = self._handlers.get(event_type, [])
        await asyncio.gather(*[handler(data) for handler in handlers])

event_bus = EventBus()

# Register handlers
async def send_welcome_email(data: dict):
    print(f"Sending welcome email to {data['email']}")

async def create_default_settings(data: dict):
    print(f"Creating default settings for user {data['user_id']}")

async def notify_admin(data: dict):
    print(f"New user registered: {data['username']}")

event_bus.subscribe("user.created", send_welcome_email)
event_bus.subscribe("user.created", create_default_settings)
event_bus.subscribe("user.created", notify_admin)

app = FastAPI()

@app.post("/users")
async def create_user(user: dict):
    new_user = {"id": 1, **user}

    # Publish event - all handlers run concurrently
    await event_bus.publish("user.created", {
        "user_id": new_user["id"],
        "username": new_user.get("username"),
        "email": new_user.get("email")
    })

    return new_user
# Production: Event-driven with Redis Streams
import aioredis
import json

class RedisEventPublisher:
    def __init__(self, redis_url: str):
        self.redis = None
        self.redis_url = redis_url

    async def connect(self):
        self.redis = aioredis.from_url(self.redis_url)

    async def publish(self, channel: str, event: dict):
        await self.redis.xadd(
            channel,
            {"data": json.dumps(event)}
        )

publisher = RedisEventPublisher("redis://localhost")

@app.post("/orders")
async def create_order(order: dict):
    new_order = {"id": 1, "total": order.get("total", 0), **order}
    await publisher.publish("orders", {
        "event": "order.created",
        "order_id": new_order["id"],
        "total": new_order["total"]
    })
    return new_order

Best practice: Start with a simple in-process event bus for monoliths. Move to Redis Streams or Kafka when you need cross-service communication or guaranteed delivery.

33. How do you handle database migrations in production?

# Install Alembic
pip install alembic

# Initialize Alembic in your project
alembic init alembic
# alembic/env.py (key configuration)
from app.database import Base
from app.models import user, product, order  # Import all models
from app.config import get_settings

settings = get_settings()
config.set_main_option("sqlalchemy.url", settings.database_url)
target_metadata = Base.metadata
# Create a migration
alembic revision --autogenerate -m "add users table"

# Apply migrations
alembic upgrade head

# Rollback one step
alembic downgrade -1

# View migration history
alembic history --verbose
# Example migration file
# add users table
# Revision ID: a1b2c3d4e5f6

from alembic import op
import sqlalchemy as sa

def upgrade():
    op.create_table(
        "users",
        sa.Column("id", sa.Integer(), primary_key=True),
        sa.Column("username", sa.String(50), unique=True, nullable=False),
        sa.Column("email", sa.String(120), unique=True, nullable=False),
        sa.Column("hashed_password", sa.String(255), nullable=False),
        sa.Column("created_at", sa.DateTime(), server_default=sa.func.now()),
    )
    op.create_index("ix_users_email", "users", ["email"])

def downgrade():
    op.drop_index("ix_users_email", table_name="users")
    op.drop_table("users")

Production migration strategy:

  • Always test migrations against a copy of the production database first.
  • Use blue-green deployments: run migrations on the new environment before switching traffic.
  • Make migrations backward-compatible: add columns as nullable first, backfill, then add constraints.
  • Never delete columns in the same deployment that stops using them.
  • Include migration commands in your CI/CD pipeline.

34. How do you implement caching strategies?

import aioredis
import json
import hashlib
from fastapi import FastAPI, Request
from functools import wraps

app = FastAPI()
redis = None

async def startup():
    global redis
    redis = aioredis.from_url("redis://localhost", decode_responses=True)

# Strategy 1: Simple key-value caching
async def get_cached_or_fetch(key: str, fetch_func, ttl: int = 300):
    cached = await redis.get(key)
    if cached:
        return json.loads(cached)
    data = await fetch_func()
    await redis.setex(key, ttl, json.dumps(data))
    return data

@app.get("/products/{product_id}")
async def get_product(product_id: int):
    async def fetch():
        return await db_get_product(product_id)
    return await get_cached_or_fetch(f"product:{product_id}", fetch, ttl=600)

# Strategy 2: Cache decorator
def cached(prefix: str, ttl: int = 300):
    def decorator(func):
        @wraps(func)
        async def wrapper(*args, **kwargs):
            # Build cache key from function args
            key_data = f"{prefix}:{args}:{kwargs}"
            cache_key = hashlib.md5(key_data.encode()).hexdigest()

            cached_result = await redis.get(cache_key)
            if cached_result:
                return json.loads(cached_result)

            result = await func(*args, **kwargs)
            await redis.setex(cache_key, ttl, json.dumps(result))
            return result
        return wrapper
    return decorator

# Strategy 3: Cache invalidation on write
@app.post("/products")
async def create_product(product: dict):
    new_product = {"id": 1, **product}
    # Invalidate list cache
    await redis.delete("products:list")
    # Cache the new product
    await redis.setex(
        f"product:{new_product['id']}",
        600,
        json.dumps(new_product)
    )
    return new_product

@app.put("/products/{product_id}")
async def update_product(product_id: int, updates: dict):
    updated = {"id": product_id, **updates}
    # Invalidate specific cache and list cache
    await redis.delete(f"product:{product_id}")
    await redis.delete("products:list")
    return updated

# Strategy 4: HTTP cache headers
from fastapi.responses import JSONResponse

@app.get("/static-config")
async def get_config():
    data = {"version": "1.0", "features": ["a", "b"]}
    response = JSONResponse(content=data)
    response.headers["Cache-Control"] = "public, max-age=3600"
    response.headers["ETag"] = hashlib.md5(json.dumps(data).encode()).hexdigest()
    return response

Caching strategies summary:

Strategy Use Case TTL Guidance
Cache-aside (lazy load) General-purpose; read-heavy data 5-60 minutes
Write-through Data that must be immediately consistent Match read cache TTL
Cache invalidation Data modified via your own API Infinite (invalidate on change)
HTTP caching headers Static or slowly changing responses Based on data volatility

35. How do you secure a FastAPI application for production?

from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.middleware.trustedhost import TrustedHostMiddleware
from starlette.middleware.httpsredirect import HTTPSRedirectMiddleware
import uuid
import re

app = FastAPI(docs_url=None, redoc_url=None)  # Disable docs in production

# 1. CORS - restrict allowed origins
app.add_middleware(
    CORSMiddleware,
    allow_origins=["https://yourfrontend.com"],
    allow_credentials=True,
    allow_methods=["GET", "POST", "PUT", "DELETE"],
    allow_headers=["*"],
)

# 2. Trusted hosts - prevent host header attacks
app.add_middleware(TrustedHostMiddleware, allowed_hosts=["api.example.com"])

# 3. HTTPS redirect
app.add_middleware(HTTPSRedirectMiddleware)

# 4. Security headers middleware
@app.middleware("http")
async def add_security_headers(request: Request, call_next):
    response = await call_next(request)
    response.headers["X-Content-Type-Options"] = "nosniff"
    response.headers["X-Frame-Options"] = "DENY"
    response.headers["X-XSS-Protection"] = "1; mode=block"
    response.headers["Strict-Transport-Security"] = "max-age=31536000; includeSubDomains"
    response.headers["Content-Security-Policy"] = "default-src 'self'"
    return response

# 5. Request ID tracking
@app.middleware("http")
async def add_request_id(request: Request, call_next):
    request_id = str(uuid.uuid4())
    request.state.request_id = request_id
    response = await call_next(request)
    response.headers["X-Request-ID"] = request_id
    return response

# 6. Input sanitization in Pydantic models
from pydantic import BaseModel, field_validator

class UserInput(BaseModel):
    name: str
    comment: str

    @field_validator("comment")
    @classmethod
    def sanitize_comment(cls, v):
        # Remove potential script tags
        cleaned = re.sub(r"<script.*?>.*?</script>", "", v, flags=re.DOTALL | re.IGNORECASE)
        return cleaned.strip()

Production security checklist:

  • Disable interactive docs (/docs, /redoc)
  • Use HTTPS everywhere with HSTS headers
  • Implement CORS with explicit allowed origins (never * in production)
  • Add rate limiting to prevent abuse
  • Validate and sanitize all inputs with Pydantic
  • Use parameterized queries to prevent SQL injection
  • Store secrets in environment variables, never in code
  • Implement proper logging and monitoring
  • Keep dependencies updated and scan for vulnerabilities

36. How do you implement CI/CD for FastAPI?

# .github/workflows/ci-cd.yml
name: CI/CD Pipeline

on:
  push:
    branches: [main]
  pull_request:
    branches: [main]

jobs:
  test:
    runs-on: ubuntu-latest
    services:
      postgres:
        image: postgres:15
        env:
          POSTGRES_USER: test
          POSTGRES_PASSWORD: test
          POSTGRES_DB: testdb
        ports:
          - 5432:5432
      redis:
        image: redis:7
        ports:
          - 6379:6379

    steps:
      - uses: actions/checkout@v4
      - uses: actions/setup-python@v5
        with:
          python-version: "3.11"

      - name: Install dependencies
        run: |
          pip install -r requirements.txt
          pip install pytest pytest-cov pytest-asyncio httpx

      - name: Run linting
        run: |
          pip install ruff
          ruff check app/

      - name: Run type checking
        run: |
          pip install mypy
          mypy app/ --ignore-missing-imports

      - name: Run tests with coverage
        env:
          DATABASE_URL: postgresql://test:test@localhost:5432/testdb
          REDIS_URL: redis://localhost:6379
        run: |
          pytest tests/ -v --cov=app --cov-report=xml

      - name: Upload coverage
        uses: codecov/codecov-action@v3

  deploy:
    needs: test
    runs-on: ubuntu-latest
    if: github.ref == 'refs/heads/main'

    steps:
      - uses: actions/checkout@v4

      - name: Build Docker image
        run: docker build -t myapi:latest .

      - name: Push to registry
        run: |
          docker tag myapi:latest registry.example.com/myapi:latest
          docker push registry.example.com/myapi:latest

      - name: Deploy to production
        run: |
          ssh deploy@production "cd /app && docker-compose pull && docker-compose up -d"

CI/CD best practices for FastAPI:

  • Linting – Use ruff for fast Python linting and formatting.
  • Type checking – Use mypy to catch type errors before runtime.
  • Testing – Run the full test suite with coverage thresholds (aim for 80%+).
  • Database migrations – Run alembic upgrade head as part of the deploy step.
  • Health checks – Add a /health endpoint and verify it after deployment.
  • Rollback plan – Keep the previous Docker image tagged so you can quickly revert.
  • Canary deployments – Route a small percentage of traffic to the new version before full rollout.

Key Takeaways

This guide covered 36 interview questions spanning junior, mid-level, and senior FastAPI topics. Here are the most important themes to remember:

Level Key Themes
Junior Understand path/query parameters, Pydantic models, type hints, HTTP methods, error handling, and how to run a FastAPI app with Uvicorn.
Mid-Level Master dependency injection, JWT authentication, async vs sync patterns, CRUD operations, APIRouter organization, testing with TestClient, and file uploads.
Senior Design scalable architectures, understand ASGI, implement WebSockets, optimize for high concurrency, manage database migrations with Alembic, implement caching and rate limiting, deploy with Docker/Nginx, secure the application, and set up CI/CD pipelines.
Final Interview Tip: The best candidates do not just answer questions — they connect concepts. When discussing dependency injection, mention how it enables testability. When discussing async endpoints, mention the impact on connection pooling. When discussing Docker deployment, mention health checks and rollback strategies. Showing this kind of systems thinking is what separates good developers from great ones.

Resources for further study:




Subscribe To Our Newsletter
You will receive our latest post and tutorial.
Thank you for subscribing!

required
required


Leave a Reply

Your email address will not be published. Required fields are marked *