Every non-trivial web application needs persistent storage. Whether you are building a REST API, an admin dashboard, or a SaaS product, the database layer is the backbone that holds your application state, user data, and business logic together. Flask, being a micro-framework, does not ship with a built-in ORM or database abstraction. This is intentional — it lets you choose the right tool for your domain instead of forcing a one-size-fits-all solution.
In practice, the Python ecosystem has converged on SQLAlchemy as the de facto ORM for Flask applications, and Flask-SQLAlchemy as the integration layer that wires SQLAlchemy into the Flask application lifecycle. This tutorial covers the full spectrum of database integration — from defining your first model to managing migrations in production, handling transactions safely, and optimizing query performance under real load.
We will also discuss when an ORM is the wrong choice and you should drop down to raw SQL. The goal is not to make you a SQLAlchemy expert overnight, but to give you the mental model and working patterns you need to build production-grade data layers in Flask.
Before we dive in, it is worth understanding the trade-off:
The pragmatic approach is to use the ORM for 90% of your operations (CRUD, relationships, standard queries) and drop to raw SQL for the remaining 10% (complex reporting, bulk inserts, database-specific features). SQLAlchemy supports both seamlessly.
Flask-SQLAlchemy is a Flask extension that adds SQLAlchemy support with sensible defaults and useful helpers. Install it along with the database driver you need:
# Core package pip install Flask-SQLAlchemy # Database drivers (install the one you need) pip install psycopg2-binary # PostgreSQL pip install PyMySQL # MySQL pip install mysqlclient # MySQL (C extension, faster) # SQLite uses Python's built-in sqlite3 module — no extra install needed
The most important configuration key is SQLALCHEMY_DATABASE_URI, which tells SQLAlchemy how to connect to your database. Here is a minimal setup:
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
def create_app():
app = Flask(__name__)
# SQLite (file-based, good for development)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///app.db'
# Disable modification tracking (saves memory)
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db.init_app(app)
return app
The URI format follows the pattern: dialect+driver://username:password@host:port/database
# SQLite — relative path (three slashes = relative to instance folder) SQLALCHEMY_DATABASE_URI = 'sqlite:///app.db' # SQLite — absolute path (four slashes) SQLALCHEMY_DATABASE_URI = 'sqlite:////var/data/app.db' # PostgreSQL SQLALCHEMY_DATABASE_URI = 'postgresql://user:password@localhost:5432/mydb' # PostgreSQL with psycopg2 driver explicitly SQLALCHEMY_DATABASE_URI = 'postgresql+psycopg2://user:password@localhost:5432/mydb' # MySQL with PyMySQL driver SQLALCHEMY_DATABASE_URI = 'mysql+pymysql://user:password@localhost:3306/mydb' # MySQL with mysqlclient driver SQLALCHEMY_DATABASE_URI = 'mysql+mysqldb://user:password@localhost:3306/mydb' # MySQL with charset specified SQLALCHEMY_DATABASE_URI = 'mysql+pymysql://user:password@localhost/mydb?charset=utf8mb4'
app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql://user:pass@localhost/mydb' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False app.config['SQLALCHEMY_ECHO'] = True # Log all SQL statements (dev only) app.config['SQLALCHEMY_POOL_SIZE'] = 10 # Connection pool size app.config['SQLALCHEMY_POOL_RECYCLE'] = 3600 # Recycle connections after 1 hour app.config['SQLALCHEMY_MAX_OVERFLOW'] = 20 # Extra connections beyond pool_size app.config['SQLALCHEMY_POOL_TIMEOUT'] = 30 # Seconds to wait for a connection
A model is a Python class that maps to a database table. Each class attribute maps to a column. Flask-SQLAlchemy provides db.Model as the base class for all your models.
from datetime import datetime, timezone
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
class User(db.Model):
__tablename__ = 'users' # Explicit table name (optional, defaults to class name lowercase)
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
username = db.Column(db.String(80), unique=True, nullable=False, index=True)
email = db.Column(db.String(120), unique=True, nullable=False)
password_hash = db.Column(db.String(256), nullable=False)
is_active = db.Column(db.Boolean, default=True, nullable=False)
role = db.Column(db.String(20), default='user', nullable=False)
bio = db.Column(db.Text, nullable=True)
created_at = db.Column(db.DateTime, default=lambda: datetime.now(timezone.utc), nullable=False)
updated_at = db.Column(
db.DateTime,
default=lambda: datetime.now(timezone.utc),
onupdate=lambda: datetime.now(timezone.utc),
nullable=False
)
def __repr__(self):
return f'<User {self.username}>'
| SQLAlchemy Type | Python Type | SQL Equivalent |
|---|---|---|
db.Integer |
int | INTEGER |
db.BigInteger |
int | BIGINT |
db.String(n) |
str | VARCHAR(n) |
db.Text |
str | TEXT |
db.Float |
float | FLOAT |
db.Numeric(10, 2) |
Decimal | NUMERIC(10, 2) |
db.Boolean |
bool | BOOLEAN |
db.DateTime |
datetime | DATETIME |
db.Date |
date | DATE |
db.Time |
time | TIME |
db.LargeBinary |
bytes | BLOB |
db.JSON |
dict/list | JSON |
db.Enum |
str/enum | ENUM |
# Primary key
id = db.Column(db.Integer, primary_key=True)
# Unique constraint
email = db.Column(db.String(120), unique=True)
# Not nullable (required field)
name = db.Column(db.String(80), nullable=False)
# Default value (Python-side)
role = db.Column(db.String(20), default='user')
# Server-side default
created_at = db.Column(db.DateTime, server_default=db.func.now())
# Index for faster queries
username = db.Column(db.String(80), index=True)
# Composite index
__table_args__ = (
db.Index('idx_user_email_role', 'email', 'role'),
db.UniqueConstraint('first_name', 'last_name', name='uq_full_name'),
)
Relationships define how models connect to each other. SQLAlchemy supports one-to-many, many-to-one, one-to-one, and many-to-many relationships. Getting these right is critical — bad relationship design leads to N+1 queries and painful refactors later.
The most common relationship. One user has many posts. The foreign key lives on the “many” side.
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
# One user has many posts
posts = db.relationship('Post', back_populates='author', lazy='select')
def __repr__(self):
return f'<User {self.username}>'
class Post(db.Model):
__tablename__ = 'posts'
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(200), nullable=False)
body = db.Column(db.Text, nullable=False)
# Foreign key to users table
author_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
# Many posts belong to one user
author = db.relationship('User', back_populates='posts')
def __repr__(self):
return f'<Post {self.title}>'
Many-to-many relationships require an association table. For example, posts can have many tags, and tags can belong to many posts.
# Association table (no model class needed for simple many-to-many)
post_tags = db.Table('post_tags',
db.Column('post_id', db.Integer, db.ForeignKey('posts.id'), primary_key=True),
db.Column('tag_id', db.Integer, db.ForeignKey('tags.id'), primary_key=True)
)
class Post(db.Model):
__tablename__ = 'posts'
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(200), nullable=False)
# Many-to-many with tags
tags = db.relationship('Tag', secondary=post_tags, back_populates='posts', lazy='select')
class Tag(db.Model):
__tablename__ = 'tags'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(50), unique=True, nullable=False)
# Many-to-many with posts
posts = db.relationship('Post', secondary=post_tags, back_populates='tags', lazy='select')
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
# One-to-one: uselist=False means this returns a single object, not a list
profile = db.relationship('Profile', back_populates='user', uselist=False)
class Profile(db.Model):
__tablename__ = 'profiles'
id = db.Column(db.Integer, primary_key=True)
bio = db.Column(db.Text)
avatar_url = db.Column(db.String(500))
user_id = db.Column(db.Integer, db.ForeignKey('users.id'), unique=True, nullable=False)
user = db.relationship('User', back_populates='profile')
The lazy parameter controls when related objects are loaded from the database:
| Value | Behavior | Use When |
|---|---|---|
'select' (default) |
Loads related objects on first access via a separate SELECT | You access the relationship occasionally |
'joined' |
Loads via JOIN in the same query | You always need the related data |
'subquery' |
Loads via a subquery after the initial query | One-to-many where JOIN would duplicate rows |
'dynamic' |
Returns a query object instead of loading results | Large collections you want to filter further |
'selectin' |
Loads via SELECT … WHERE id IN (…) | Best default for most one-to-many relationships |
Once your models are defined, you need to create the actual database tables. The simplest approach uses db.create_all().
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
def create_app():
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///app.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db.init_app(app)
with app.app_context():
# Import models so SQLAlchemy knows about them
from . import models
db.create_all()
return app
For production applications, use the application factory pattern. This separates the creation of the db object from the app, allowing you to create multiple app instances (for testing, different configs, etc.).
# extensions.py
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
# models.py
from extensions import db
from datetime import datetime, timezone
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
created_at = db.Column(db.DateTime, default=lambda: datetime.now(timezone.utc))
# app.py
from flask import Flask
from extensions import db
def create_app(config_name='development'):
app = Flask(__name__)
# Load config based on environment
if config_name == 'development':
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///dev.db'
app.config['SQLALCHEMY_ECHO'] = True
elif config_name == 'testing':
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///:memory:'
elif config_name == 'production':
app.config['SQLALCHEMY_DATABASE_URI'] = os.environ.get('DATABASE_URL')
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
# Initialize extensions
db.init_app(app)
# Register blueprints, error handlers, etc.
# ...
return app
Important: db.create_all() only creates tables that do not already exist. It will not modify existing tables (add columns, change types, etc.). For schema changes on existing tables, you need migrations — covered in section 8.
CRUD stands for Create, Read, Update, Delete — the four fundamental operations on any data store. Here is how each works with Flask-SQLAlchemy.
# Create a single record
user = User(username='john_doe', email='john@example.com', password_hash='hashed_pw')
db.session.add(user)
db.session.commit()
# The user now has an id assigned by the database
print(user.id) # e.g., 1
# Create multiple records at once
users = [
User(username='alice', email='alice@example.com', password_hash='hash1'),
User(username='bob', email='bob@example.com', password_hash='hash2'),
User(username='charlie', email='charlie@example.com', password_hash='hash3'),
]
db.session.add_all(users)
db.session.commit()
# Get by primary key
user = db.session.get(User, 1) # Returns None if not found
# Get all records
all_users = User.query.all()
# Get first matching record
user = User.query.filter_by(username='john_doe').first()
# Get first or return 404 (useful in route handlers)
user = User.query.filter_by(username='john_doe').first_or_404(
description='User not found'
)
# Get by primary key or 404
user = db.session.get(User, 1) or abort(404)
# Method 1: Modify the object and commit
user = User.query.filter_by(username='john_doe').first()
if user:
user.email = 'newemail@example.com'
user.role = 'admin'
db.session.commit()
# Method 2: Bulk update (more efficient for many records)
User.query.filter(User.role == 'user').update({'is_active': False})
db.session.commit()
# Method 3: Update with returning the count of affected rows
count = User.query.filter(User.last_login < cutoff_date).update(
{'is_active': False},
synchronize_session='fetch'
)
db.session.commit()
print(f'Deactivated {count} users')
# Delete a single record
user = User.query.filter_by(username='john_doe').first()
if user:
db.session.delete(user)
db.session.commit()
# Bulk delete
deleted_count = User.query.filter(User.is_active == False).delete()
db.session.commit()
print(f'Deleted {deleted_count} inactive users')
SQLAlchemy's query interface is expressive and composable. You can chain methods to build complex queries without writing raw SQL.
# filter_by — simple equality checks using keyword arguments
users = User.query.filter_by(role='admin', is_active=True).all()
# filter — more powerful, supports operators
users = User.query.filter(User.age >= 18).all()
users = User.query.filter(User.username.like('%john%')).all()
users = User.query.filter(User.email.endswith('@example.com')).all()
users = User.query.filter(User.role.in_(['admin', 'moderator'])).all()
users = User.query.filter(User.bio.isnot(None)).all()
# Combine multiple filters (AND)
users = User.query.filter(
User.role == 'admin',
User.is_active == True,
User.created_at >= start_date
).all()
# OR conditions
from sqlalchemy import or_
users = User.query.filter(
or_(User.role == 'admin', User.role == 'moderator')
).all()
# NOT conditions
from sqlalchemy import not_
users = User.query.filter(not_(User.is_active)).all()
# Order by
users = User.query.order_by(User.created_at.desc()).all()
users = User.query.order_by(User.last_name.asc(), User.first_name.asc()).all()
# Limit and offset
users = User.query.order_by(User.id).limit(10).offset(20).all()
# First result
user = User.query.order_by(User.created_at.desc()).first()
# Count
active_count = User.query.filter_by(is_active=True).count()
# Pagination (Flask-SQLAlchemy built-in)
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 20, type=int)
pagination = User.query.order_by(User.created_at.desc()).paginate(
page=page,
per_page=per_page,
error_out=False # Return empty page instead of 404
)
# Pagination object properties
items = pagination.items # List of items on current page
total = pagination.total # Total number of items
pages = pagination.pages # Total number of pages
has_next = pagination.has_next
has_prev = pagination.has_prev
next_num = pagination.next_num
prev_num = pagination.prev_num
from sqlalchemy import func
# Count
total = db.session.query(func.count(User.id)).scalar()
# Sum
total_revenue = db.session.query(func.sum(Order.total_amount)).scalar()
# Average
avg_age = db.session.query(func.avg(User.age)).scalar()
# Min / Max
oldest = db.session.query(func.min(User.created_at)).scalar()
newest = db.session.query(func.max(User.created_at)).scalar()
# Group by
role_counts = db.session.query(
User.role,
func.count(User.id).label('count')
).group_by(User.role).all()
for role, count in role_counts:
print(f'{role}: {count}')
# Group by with having
popular_roles = db.session.query(
User.role,
func.count(User.id).label('count')
).group_by(User.role).having(func.count(User.id) > 5).all()
In production, you cannot use db.create_all() to evolve your schema. It does not alter existing tables — it only creates missing ones. Database migrations track every schema change as a versioned script that can be applied (upgrade) or reversed (downgrade). Flask-Migrate wraps Alembic, the migration tool for SQLAlchemy.
pip install Flask-Migrate
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
db = SQLAlchemy()
migrate = Migrate()
def create_app():
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///app.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db.init_app(app)
migrate.init_app(app, db)
return app
# Initialize the migrations directory (run once) flask db init # Generate a migration script after changing models flask db migrate -m "Add users table" # Apply the migration to the database flask db upgrade # Revert the last migration flask db downgrade # Show current migration version flask db current # Show migration history flask db history # Upgrade to a specific version flask db upgrade ae1027a6acf # Downgrade to a specific version flask db downgrade ae1027a6acf
# 1. Make changes to your models (add column, new table, etc.) # 2. Generate migration flask db migrate -m "Add phone_number column to users" # 3. Review the generated migration file in migrations/versions/ # 4. Apply flask db upgrade # 5. Commit migration file to version control git add migrations/ git commit -m "Add phone_number column migration"
Critical rule: Always review auto-generated migration files before applying them. Alembic does its best to detect changes, but it can miss things like column renames (it sees a drop + add instead), data migrations, or index changes. Edit the generated file if needed.
"""Add phone_number column to users
Revision ID: a1b2c3d4e5f6
Revises: 9z8y7x6w5v4u
Create Date: 2026-02-26 10:30:00.000000
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers
revision = 'a1b2c3d4e5f6'
down_revision = '9z8y7x6w5v4u'
branch_labels = None
depends_on = None
def upgrade():
op.add_column('users', sa.Column('phone_number', sa.String(20), nullable=True))
op.create_index('idx_users_phone', 'users', ['phone_number'])
def downgrade():
op.drop_index('idx_users_phone', table_name='users')
op.drop_column('users', 'phone_number')
Seeding populates your database with initial or test data. Flask's CLI makes it easy to create custom commands for this.
import click
from flask.cli import with_appcontext
from extensions import db
from models import User, Product, Category
@click.command('seed-db')
@with_appcontext
def seed_db_command():
"""Seed the database with sample data."""
# Clear existing data
db.session.execute(db.text('DELETE FROM users'))
db.session.execute(db.text('DELETE FROM products'))
db.session.execute(db.text('DELETE FROM categories'))
# Seed categories
categories = [
Category(name='Electronics', description='Gadgets and devices'),
Category(name='Clothing', description='Apparel and accessories'),
Category(name='Books', description='Physical and digital books'),
]
db.session.add_all(categories)
db.session.flush() # Flush to get IDs without committing
# Seed users
users = [
User(username='admin', email='admin@example.com', password_hash='hashed_admin', role='admin'),
User(username='alice', email='alice@example.com', password_hash='hashed_alice'),
User(username='bob', email='bob@example.com', password_hash='hashed_bob'),
]
db.session.add_all(users)
# Seed products
products = [
Product(name='Laptop', price=999.99, category_id=categories[0].id, stock=50),
Product(name='T-Shirt', price=19.99, category_id=categories[1].id, stock=200),
Product(name='Python Cookbook', price=39.99, category_id=categories[2].id, stock=100),
]
db.session.add_all(products)
db.session.commit()
click.echo('Database seeded successfully.')
# Register the command in your app factory
def create_app():
app = Flask(__name__)
# ... config, extensions ...
app.cli.add_command(seed_db_command)
return app
# Run the seed command flask seed-db
from faker import Faker
fake = Faker()
@click.command('seed-fake')
@click.argument('count', default=50)
@with_appcontext
def seed_fake_command(count):
"""Generate fake users for development."""
users = []
for _ in range(count):
users.append(User(
username=fake.unique.user_name(),
email=fake.unique.email(),
password_hash=fake.sha256(),
bio=fake.paragraph(nb_sentences=3),
is_active=fake.boolean(chance_of_getting_true=85),
created_at=fake.date_time_between(start_date='-1y', end_date='now')
))
db.session.add_all(users)
db.session.commit()
click.echo(f'Created {count} fake users.')
Database connections are expensive to create. Connection pooling keeps a set of connections open and reuses them across requests. SQLAlchemy handles this automatically, but you should tune the settings for your workload.
app.config['SQLALCHEMY_ENGINE_OPTIONS'] = {
'pool_size': 10, # Number of permanent connections to keep
'max_overflow': 20, # Extra connections allowed beyond pool_size
'pool_timeout': 30, # Seconds to wait for a connection from the pool
'pool_recycle': 1800, # Recycle connections after 30 minutes
'pool_pre_ping': True, # Test connections before using them (handles stale connections)
}
pool_size connections are in use, SQLAlchemy can create up to max_overflow additional temporary connections. These are closed when returned to the pool. Default is 10.wait_timeout (default 8 hours). Set this lower than your database's idle timeout.SELECT 1 before using a connection. Catches dead connections without the application seeing an error. Small overhead but highly recommended for production.# For a typical web app handling ~50 concurrent requests
app.config['SQLALCHEMY_ENGINE_OPTIONS'] = {
'pool_size': 20,
'max_overflow': 30,
'pool_timeout': 30,
'pool_recycle': 1800,
'pool_pre_ping': True,
}
# For a lightweight app or development
app.config['SQLALCHEMY_ENGINE_OPTIONS'] = {
'pool_size': 5,
'max_overflow': 10,
'pool_recycle': 3600,
'pool_pre_ping': True,
}
# To disable pooling entirely (useful for debugging)
from sqlalchemy.pool import NullPool
app.config['SQLALCHEMY_ENGINE_OPTIONS'] = {
'poolclass': NullPool,
}
A transaction groups multiple database operations into a single atomic unit — either all of them succeed, or none of them do. SQLAlchemy uses transactions implicitly (every operation between commit() calls is a transaction), but understanding explicit transaction control is essential for correctness.
# Implicit transaction — the most common pattern
try:
user = User(username='alice', email='alice@example.com', password_hash='hash')
db.session.add(user)
profile = Profile(user=user, bio='Software engineer')
db.session.add(profile)
db.session.commit() # Both user and profile are saved atomically
except Exception as e:
db.session.rollback() # Undo everything if any operation fails
raise e
from contextlib import contextmanager
@contextmanager
def transaction():
"""Context manager for database transactions."""
try:
yield db.session
db.session.commit()
except Exception:
db.session.rollback()
raise
# Usage
with transaction() as session:
user = User(username='bob', email='bob@example.com', password_hash='hash')
session.add(user)
profile = Profile(user=user, bio='Data scientist')
session.add(profile)
# Commits automatically on exit, rolls back on exception
def place_order(user_id, items):
"""Place an order with savepoints for partial rollback."""
try:
order = Order(user_id=user_id, status='pending')
db.session.add(order)
db.session.flush() # Get the order ID
for item in items:
# Savepoint for each item — if one fails, we can skip it
savepoint = db.session.begin_nested()
try:
product = db.session.get(Product, item['product_id'])
if product.stock < item['quantity']:
raise ValueError(f'Insufficient stock for {product.name}')
product.stock -= item['quantity']
order_item = OrderItem(
order_id=order.id,
product_id=product.id,
quantity=item['quantity'],
unit_price=product.price
)
db.session.add(order_item)
savepoint.commit()
except Exception as e:
savepoint.rollback()
print(f'Skipping item: {e}')
order.total_amount = sum(
oi.quantity * oi.unit_price for oi in order.items
)
db.session.commit()
return order
except Exception as e:
db.session.rollback()
raise e
from sqlalchemy.exc import IntegrityError, OperationalError
def create_user(username, email, password_hash):
"""Create a user with proper error handling."""
try:
user = User(
username=username,
email=email,
password_hash=password_hash
)
db.session.add(user)
db.session.commit()
return user, None
except IntegrityError as e:
db.session.rollback()
if 'username' in str(e.orig):
return None, 'Username already exists'
if 'email' in str(e.orig):
return None, 'Email already exists'
return None, 'Duplicate entry'
except OperationalError as e:
db.session.rollback()
return None, 'Database connection error'
except Exception as e:
db.session.rollback()
return None, f'Unexpected error: {str(e)}'
Let us build a complete e-commerce data layer that ties together everything covered so far. This example includes four interconnected models, full CRUD operations, complex queries, and transactional order placement.
from datetime import datetime, timezone
from extensions import db
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False, index=True)
email = db.Column(db.String(120), unique=True, nullable=False)
password_hash = db.Column(db.String(256), nullable=False)
is_active = db.Column(db.Boolean, default=True, nullable=False)
created_at = db.Column(db.DateTime, default=lambda: datetime.now(timezone.utc))
# Relationships
orders = db.relationship('Order', back_populates='user', lazy='selectin')
def __repr__(self):
return f'<User {self.username}>'
def to_dict(self):
return {
'id': self.id,
'username': self.username,
'email': self.email,
'is_active': self.is_active,
'created_at': self.created_at.isoformat(),
'order_count': len(self.orders)
}
class Product(db.Model):
__tablename__ = 'products'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(200), nullable=False, index=True)
description = db.Column(db.Text)
price = db.Column(db.Numeric(10, 2), nullable=False)
stock = db.Column(db.Integer, default=0, nullable=False)
category = db.Column(db.String(50), nullable=False, index=True)
is_available = db.Column(db.Boolean, default=True, nullable=False)
created_at = db.Column(db.DateTime, default=lambda: datetime.now(timezone.utc))
# Relationships
order_items = db.relationship('OrderItem', back_populates='product', lazy='select')
__table_args__ = (
db.Index('idx_product_category_price', 'category', 'price'),
db.CheckConstraint('price > 0', name='ck_positive_price'),
db.CheckConstraint('stock >= 0', name='ck_non_negative_stock'),
)
def __repr__(self):
return f'<Product {self.name} ${self.price}>'
def to_dict(self):
return {
'id': self.id,
'name': self.name,
'description': self.description,
'price': float(self.price),
'stock': self.stock,
'category': self.category,
'is_available': self.is_available
}
class Order(db.Model):
__tablename__ = 'orders'
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False, index=True)
status = db.Column(db.String(20), default='pending', nullable=False)
total_amount = db.Column(db.Numeric(12, 2), default=0)
shipping_address = db.Column(db.Text)
created_at = db.Column(db.DateTime, default=lambda: datetime.now(timezone.utc))
updated_at = db.Column(
db.DateTime,
default=lambda: datetime.now(timezone.utc),
onupdate=lambda: datetime.now(timezone.utc)
)
# Relationships
user = db.relationship('User', back_populates='orders')
items = db.relationship('OrderItem', back_populates='order', lazy='selectin',
cascade='all, delete-orphan')
__table_args__ = (
db.Index('idx_order_user_status', 'user_id', 'status'),
)
def __repr__(self):
return f'<Order #{self.id} - {self.status}>'
def to_dict(self):
return {
'id': self.id,
'user_id': self.user_id,
'status': self.status,
'total_amount': float(self.total_amount),
'items': [item.to_dict() for item in self.items],
'created_at': self.created_at.isoformat()
}
class OrderItem(db.Model):
__tablename__ = 'order_items'
id = db.Column(db.Integer, primary_key=True)
order_id = db.Column(db.Integer, db.ForeignKey('orders.id'), nullable=False)
product_id = db.Column(db.Integer, db.ForeignKey('products.id'), nullable=False)
quantity = db.Column(db.Integer, nullable=False)
unit_price = db.Column(db.Numeric(10, 2), nullable=False)
# Relationships
order = db.relationship('Order', back_populates='items')
product = db.relationship('Product', back_populates='order_items')
__table_args__ = (
db.CheckConstraint('quantity > 0', name='ck_positive_quantity'),
)
@property
def subtotal(self):
return float(self.quantity * self.unit_price)
def __repr__(self):
return f'<OrderItem {self.product.name} x{self.quantity}>'
def to_dict(self):
return {
'id': self.id,
'product_id': self.product_id,
'product_name': self.product.name,
'quantity': self.quantity,
'unit_price': float(self.unit_price),
'subtotal': self.subtotal
}
# ---- USER CRUD ----
def create_user(username, email, password_hash):
"""Create a new user."""
user = User(username=username, email=email, password_hash=password_hash)
db.session.add(user)
db.session.commit()
return user
def get_user(user_id):
"""Get a user by ID."""
return db.session.get(User, user_id)
def update_user(user_id, **kwargs):
"""Update user fields."""
user = db.session.get(User, user_id)
if not user:
return None
for key, value in kwargs.items():
if hasattr(user, key):
setattr(user, key, value)
db.session.commit()
return user
def delete_user(user_id):
"""Soft-delete a user by deactivating."""
user = db.session.get(User, user_id)
if user:
user.is_active = False
db.session.commit()
return user
# ---- PRODUCT CRUD ----
def create_product(name, price, category, stock=0, description=None):
"""Create a new product."""
product = Product(
name=name, price=price, category=category,
stock=stock, description=description
)
db.session.add(product)
db.session.commit()
return product
def get_products_by_category(category, min_price=None, max_price=None):
"""Get products filtered by category and optional price range."""
query = Product.query.filter_by(category=category, is_available=True)
if min_price is not None:
query = query.filter(Product.price >= min_price)
if max_price is not None:
query = query.filter(Product.price <= max_price)
return query.order_by(Product.price.asc()).all()
def update_stock(product_id, quantity_change):
"""Adjust product stock. Use negative values for decrements."""
product = db.session.get(Product, product_id)
if product:
product.stock += quantity_change
if product.stock <= 0:
product.is_available = False
db.session.commit()
return product
from sqlalchemy import func, desc
def get_top_customers(limit=10):
"""Get customers with the highest total spend."""
results = db.session.query(
User.username,
User.email,
func.count(Order.id).label('order_count'),
func.sum(Order.total_amount).label('total_spent')
).join(Order, User.id == Order.user_id)\
.filter(Order.status == 'completed')\
.group_by(User.id)\
.order_by(desc('total_spent'))\
.limit(limit)\
.all()
return [
{
'username': r.username,
'email': r.email,
'order_count': r.order_count,
'total_spent': float(r.total_spent)
}
for r in results
]
def get_revenue_by_category():
"""Get total revenue grouped by product category."""
results = db.session.query(
Product.category,
func.sum(OrderItem.quantity * OrderItem.unit_price).label('revenue'),
func.sum(OrderItem.quantity).label('units_sold')
).join(OrderItem, Product.id == OrderItem.product_id)\
.join(Order, OrderItem.order_id == Order.id)\
.filter(Order.status == 'completed')\
.group_by(Product.category)\
.order_by(desc('revenue'))\
.all()
return [
{
'category': r.category,
'revenue': float(r.revenue),
'units_sold': r.units_sold
}
for r in results
]
def get_user_order_history(user_id, page=1, per_page=10):
"""Get paginated order history for a user with item details."""
return Order.query\
.filter_by(user_id=user_id)\
.order_by(Order.created_at.desc())\
.paginate(page=page, per_page=per_page, error_out=False)
def search_products(query_text, category=None, in_stock_only=True):
"""Full-text product search with filters."""
q = Product.query.filter(
Product.name.ilike(f'%{query_text}%')
)
if category:
q = q.filter_by(category=category)
if in_stock_only:
q = q.filter(Product.stock > 0, Product.is_available == True)
return q.order_by(Product.name).all()
from decimal import Decimal
from sqlalchemy.exc import IntegrityError
def place_order(user_id, cart_items, shipping_address):
"""
Place an order atomically.
Args:
user_id: ID of the user placing the order
cart_items: List of dicts with 'product_id' and 'quantity'
shipping_address: Shipping address string
Returns:
(Order, None) on success, (None, error_message) on failure
"""
try:
# Verify user exists and is active
user = db.session.get(User, user_id)
if not user or not user.is_active:
return None, 'Invalid or inactive user'
# Create the order
order = Order(
user_id=user_id,
status='pending',
shipping_address=shipping_address
)
db.session.add(order)
db.session.flush() # Get order.id without committing
total = Decimal('0.00')
for item in cart_items:
# Lock the product row to prevent race conditions
product = db.session.query(Product).filter_by(
id=item['product_id']
).with_for_update().first()
if not product:
db.session.rollback()
return None, f'Product {item["product_id"]} not found'
if not product.is_available:
db.session.rollback()
return None, f'{product.name} is no longer available'
if product.stock < item['quantity']:
db.session.rollback()
return None, f'Insufficient stock for {product.name} (available: {product.stock})'
# Deduct stock
product.stock -= item['quantity']
if product.stock == 0:
product.is_available = False
# Create order item
order_item = OrderItem(
order_id=order.id,
product_id=product.id,
quantity=item['quantity'],
unit_price=product.price
)
db.session.add(order_item)
total += product.price * item['quantity']
order.total_amount = total
db.session.commit()
return order, None
except IntegrityError as e:
db.session.rollback()
return None, f'Data integrity error: {str(e.orig)}'
except Exception as e:
db.session.rollback()
return None, f'Order failed: {str(e)}'
@app.route('/api/orders', methods=['POST'])
def create_order():
data = request.get_json()
order, error = place_order(
user_id=data['user_id'],
cart_items=data['items'], # [{'product_id': 1, 'quantity': 2}, ...]
shipping_address=data['shipping_address']
)
if error:
return jsonify({'error': error}), 400
return jsonify(order.to_dict()), 201
Sometimes the ORM gets in the way. Complex reporting queries, database-specific features (window functions, CTEs, recursive queries), or bulk operations are often cleaner and faster as raw SQL. SQLAlchemy makes this straightforward.
from sqlalchemy import text
# Simple query
result = db.session.execute(text('SELECT * FROM users WHERE is_active = :active'), {'active': True})
users = result.fetchall()
for user in users:
print(user.username, user.email) # Access by column name
# Insert
db.session.execute(
text('INSERT INTO users (username, email, password_hash) VALUES (:username, :email, :password)'),
{'username': 'dave', 'email': 'dave@example.com', 'password': 'hashed_pw'}
)
db.session.commit()
# Update
db.session.execute(
text('UPDATE products SET price = price * :multiplier WHERE category = :category'),
{'multiplier': 1.10, 'category': 'Electronics'}
)
db.session.commit()
# Delete
db.session.execute(
text('DELETE FROM sessions WHERE last_active < :cutoff'),
{'cutoff': datetime(2026, 1, 1)}
)
db.session.commit()
def get_monthly_revenue_report(year):
"""Get monthly revenue breakdown with running totals."""
sql = text("""
SELECT
EXTRACT(MONTH FROM o.created_at) AS month,
COUNT(DISTINCT o.id) AS order_count,
COUNT(DISTINCT o.user_id) AS unique_customers,
SUM(o.total_amount) AS monthly_revenue,
SUM(SUM(o.total_amount)) OVER (ORDER BY EXTRACT(MONTH FROM o.created_at)) AS running_total
FROM orders o
WHERE EXTRACT(YEAR FROM o.created_at) = :year
AND o.status = 'completed'
GROUP BY EXTRACT(MONTH FROM o.created_at)
ORDER BY month
""")
result = db.session.execute(sql, {'year': year})
return [
{
'month': int(row.month),
'order_count': row.order_count,
'unique_customers': row.unique_customers,
'revenue': float(row.monthly_revenue),
'running_total': float(row.running_total)
}
for row in result
]
# Bulk insert — much faster than ORM for large datasets
def bulk_import_products(products_data):
"""Import thousands of products efficiently."""
sql = text("""
INSERT INTO products (name, price, category, stock, is_available)
VALUES (:name, :price, :category, :stock, :is_available)
""")
# Execute with a list of parameter dicts
db.session.execute(sql, products_data)
db.session.commit()
# Usage
products = [
{'name': f'Product {i}', 'price': 9.99, 'category': 'Bulk', 'stock': 100, 'is_available': True}
for i in range(10000)
]
bulk_import_products(products)
Security note: Always use parameterized queries with :param_name placeholders. Never use f-strings or string concatenation to build SQL — that is how SQL injection happens.
Database performance problems are the most common cause of slow web applications. Here are the patterns and techniques that matter most in Flask-SQLAlchemy.
The N+1 problem occurs when you load a list of N objects and then access a relationship on each, causing N additional queries.
# BAD: N+1 queries — 1 query for orders + N queries for user on each order
orders = Order.query.all()
for order in orders:
print(order.user.username) # Each access triggers a separate SELECT
# GOOD: Eager loading with joinedload — 1 query total
from sqlalchemy.orm import joinedload
orders = Order.query.options(joinedload(Order.user)).all()
for order in orders:
print(order.user.username) # Already loaded, no extra query
# GOOD: Eager loading with selectinload — 2 queries total (better for one-to-many)
from sqlalchemy.orm import selectinload
users = User.query.options(selectinload(User.orders)).all()
for user in users:
print(f'{user.username}: {len(user.orders)} orders') # Already loaded
# Nested eager loading
orders = Order.query.options(
joinedload(Order.user),
selectinload(Order.items).joinedload(OrderItem.product)
).all()
# Load only specific columns usernames = db.session.query(User.username, User.email).filter_by(is_active=True).all() # Defer heavy columns (load them on access) from sqlalchemy.orm import defer users = User.query.options(defer(User.bio), defer(User.password_hash)).all() # Undefer when you need them users = User.query.options(defer(User.bio)).all() # Later, accessing user.bio will trigger a lazy load for that specific column
# Index columns you filter, sort, or join on frequently
class Product(db.Model):
__tablename__ = 'products'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(200), index=True) # Searched frequently
category = db.Column(db.String(50), index=True) # Filtered frequently
price = db.Column(db.Numeric(10, 2))
created_at = db.Column(db.DateTime, index=True) # Sorted frequently
# Composite index for queries that filter on both
__table_args__ = (
db.Index('idx_category_price', 'category', 'price'),
db.Index('idx_category_created', 'category', 'created_at'),
)
# Enable SQL logging in development
app.config['SQLALCHEMY_ECHO'] = True
# Or use events for more control
from sqlalchemy import event
import time
@event.listens_for(db.engine, 'before_cursor_execute')
def before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
conn.info['query_start_time'] = time.time()
@event.listens_for(db.engine, 'after_cursor_execute')
def after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
total = time.time() - conn.info['query_start_time']
if total > 0.5: # Log slow queries (over 500ms)
app.logger.warning(f'Slow query ({total:.2f}s): {statement}')
These are the mistakes that cost real hours in debugging. Know them, avoid them.
# BUG: Changes are never persisted user = User(username='alice', email='alice@example.com', password_hash='hash') db.session.add(user) # Missing: db.session.commit() # The user exists in the session but NOT in the database # FIX: Always commit after making changes db.session.add(user) db.session.commit()
# BUG: Accessing attributes after the session is closed
def get_user_data():
user = User.query.first()
return user
# Later, outside the request context:
user = get_user_data()
print(user.orders) # DetachedInstanceError! Session is gone.
# FIX 1: Eager load what you need
def get_user_data():
return User.query.options(selectinload(User.orders)).first()
# FIX 2: Convert to dict while session is active
def get_user_data():
user = User.query.first()
return user.to_dict() # Serialize within the session context
# FIX 3: Keep the object attached by using it within the request
@app.route('/users/<int:id>')
def get_user(id):
user = User.query.get_or_404(id)
return jsonify(user.to_dict()) # Serialized within request context
# BUG: Template triggers N+1 queries
@app.route('/orders')
def list_orders():
orders = Order.query.all()
return render_template('orders.html', orders=orders)
# Template: {% for order in orders %} {{ order.user.username }} {% endfor %}
# This fires a SELECT for each order's user!
# FIX: Eager load in the view
@app.route('/orders')
def list_orders():
orders = Order.query.options(joinedload(Order.user)).all()
return render_template('orders.html', orders=orders)
# BUG: Failed operation poisons the session for subsequent requests
try:
db.session.add(user)
db.session.commit()
except IntegrityError:
pass # Session is now in a broken state!
# FIX: Always rollback on error
try:
db.session.add(user)
db.session.commit()
except IntegrityError:
db.session.rollback()
# Now the session is clean for the next operation
# BAD: This will NOT update existing tables # If you add a column to a model, create_all() ignores it db.create_all() # Only creates tables that don't exist # GOOD: Use migrations for all schema changes # flask db migrate -m "Add new column" # flask db upgrade
Set up Flask-Migrate from day one, even for small projects. db.create_all() is only acceptable for throwaway prototypes and test fixtures. Every schema change should be a migration file committed to version control.
from sqlalchemy import validates
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
age = db.Column(db.Integer)
@validates('email')
def validate_email(self, key, email):
if '@' not in email:
raise ValueError('Invalid email address')
return email.lower().strip()
@validates('username')
def validate_username(self, key, username):
if len(username) < 3:
raise ValueError('Username must be at least 3 characters')
if not username.isalnum():
raise ValueError('Username must be alphanumeric')
return username.lower().strip()
@validates('age')
def validate_age(self, key, age):
if age is not None and (age < 0 or age > 150):
raise ValueError('Age must be between 0 and 150')
return age
Encapsulate database access in repository classes to keep your route handlers clean and your data access testable.
class UserRepository:
"""Encapsulates all database operations for User."""
@staticmethod
def create(username, email, password_hash):
user = User(username=username, email=email, password_hash=password_hash)
db.session.add(user)
db.session.commit()
return user
@staticmethod
def get_by_id(user_id):
return db.session.get(User, user_id)
@staticmethod
def get_by_username(username):
return User.query.filter_by(username=username).first()
@staticmethod
def get_active_users(page=1, per_page=20):
return User.query.filter_by(is_active=True)\
.order_by(User.created_at.desc())\
.paginate(page=page, per_page=per_page, error_out=False)
@staticmethod
def update(user_id, **kwargs):
user = db.session.get(User, user_id)
if not user:
return None
for key, value in kwargs.items():
if hasattr(user, key):
setattr(user, key, value)
db.session.commit()
return user
@staticmethod
def deactivate(user_id):
user = db.session.get(User, user_id)
if user:
user.is_active = False
db.session.commit()
return user
# Usage in routes — clean and testable
@app.route('/api/users', methods=['POST'])
def create_user():
data = request.get_json()
try:
user = UserRepository.create(
username=data['username'],
email=data['email'],
password_hash=generate_password_hash(data['password'])
)
return jsonify(user.to_dict()), 201
except IntegrityError:
db.session.rollback()
return jsonify({'error': 'Username or email already exists'}), 409
# Ensure sessions are cleaned up after each request
@app.teardown_appcontext
def shutdown_session(exception=None):
db.session.remove()
# Use pool_pre_ping to handle stale connections
app.config['SQLALCHEMY_ENGINE_OPTIONS'] = {
'pool_pre_ping': True,
'pool_recycle': 1800,
}
import os
class Config:
SQLALCHEMY_TRACK_MODIFICATIONS = False
SQLALCHEMY_ENGINE_OPTIONS = {
'pool_pre_ping': True,
}
class DevelopmentConfig(Config):
SQLALCHEMY_DATABASE_URI = 'sqlite:///dev.db'
SQLALCHEMY_ECHO = True
class TestingConfig(Config):
SQLALCHEMY_DATABASE_URI = 'sqlite:///:memory:'
TESTING = True
class ProductionConfig(Config):
SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL')
SQLALCHEMY_ENGINE_OPTIONS = {
'pool_size': 20,
'max_overflow': 30,
'pool_recycle': 1800,
'pool_pre_ping': True,
}
config = {
'development': DevelopmentConfig,
'testing': TestingConfig,
'production': ProductionConfig,
}
db.Model. Use column types, constraints, and indexes to enforce data integrity at the database level.db.relationship() with back_populates for bidirectional access. Choose the right lazy loading strategy to avoid N+1 queries.commit() calls. Always rollback() on error. Use begin_nested() for savepoints when you need partial rollback.pool_size, pool_recycle, and enable pool_pre_ping for production stability.joinedload, selectinload) is your primary weapon against N+1 performance problems. Profile your queries in development with SQLALCHEMY_ECHO = True.text() with parameterized queries — never string concatenation.@validates decorators and database constraints (CheckConstraint, unique, nullable).A REST API (Representational State Transfer Application Programming Interface) is a software architectural style that defines a set of constraints for building web services. REST APIs communicate over HTTP, use standard methods (GET, POST, PUT, DELETE) to perform operations on resources, and exchange data in a structured format — almost always JSON in modern applications.
The core RESTful principles are:
/api/books/42 identifies a book, not an action.Flask is an excellent choice for building REST APIs because it gives you control without overhead. There is no built-in ORM you are forced to use, no admin panel you have to disable, and no opinionated project layout. You choose your serialization library, your database layer, and your authentication strategy. For microservices and lightweight APIs, Flask’s minimal footprint and fast startup time are significant advantages over heavier frameworks.
This tutorial walks through building production-quality REST APIs with Flask, from your first endpoint to a fully authenticated, database-backed, tested API. Every code example is written as you would actually write it in a professional codebase.
A well-organized Flask API project separates concerns from the start. Here is the structure we will build toward throughout this tutorial:
flask-rest-api/ ├── app/ │ ├── __init__.py # Application factory │ ├── config.py # Configuration classes │ ├── models.py # SQLAlchemy models │ ├── schemas.py # Marshmallow schemas │ ├── routes/ │ │ ├── __init__.py │ │ ├── books.py # Book endpoints │ │ └── auth.py # Authentication endpoints │ ├── errors.py # Custom error handlers │ └── extensions.py # Extension instances (db, ma, jwt) ├── tests/ │ ├── conftest.py # Test fixtures │ ├── test_books.py # Book endpoint tests │ └── test_auth.py # Auth endpoint tests ├── requirements.txt ├── run.py # Entry point └── .env # Environment variables (not committed)
Create a virtual environment and install the packages we will use throughout this tutorial:
python3 -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
pip install flask flask-sqlalchemy flask-marshmallow marshmallow-sqlalchemy \
flask-jwt-extended flask-cors python-dotenv pytest
Pin your dependencies immediately:
pip freeze > requirements.txt
Each of these packages serves a specific role: flask-sqlalchemy for database integration, flask-marshmallow and marshmallow-sqlalchemy for serialization, flask-jwt-extended for JWT authentication, flask-cors for cross-origin requests, and pytest for testing.
Let us start with the simplest possible Flask API — a single endpoint that returns JSON:
from flask import Flask, jsonify
app = Flask(__name__)
@app.route('/api/health', methods=['GET'])
def health_check():
return jsonify({
'status': 'healthy',
'service': 'flask-rest-api',
'version': '1.0.0'
}), 200
if __name__ == '__main__':
app.run(debug=True)
Key points here:
jsonify() converts a Python dictionary to a JSON response with the correct Content-Type: application/json header. Never return raw strings from an API endpoint.200 is the default, but being explicit makes your intent clear.methods=['GET'] argument restricts this route to GET requests only. Without it, Flask defaults to GET.Run it and test:
python run.py # In another terminal: curl http://localhost:5000/api/health
{
"status": "healthy",
"service": "flask-rest-api",
"version": "1.0.0"
}
REST APIs communicate intent through status codes. Here are the ones you will use constantly:
200 OK — Successful GET, PUT, or DELETE201 Created — Successful POST that created a resource204 No Content — Successful DELETE with no response body400 Bad Request — Client sent invalid data401 Unauthorized — Authentication required or failed403 Forbidden — Authenticated but not authorized404 Not Found — Resource does not exist409 Conflict — Resource already exists (e.g., duplicate email)422 Unprocessable Entity — Request is well-formed but semantically invalid500 Internal Server Error — Something broke on the serverCRUD (Create, Read, Update, Delete) maps directly to HTTP methods. Here is a complete in-memory example before we add a database:
from flask import Flask, jsonify, request, abort
app = Flask(__name__)
# In-memory data store (replaced by a database later)
books = [
{'id': 1, 'title': 'Clean Code', 'author': 'Robert C. Martin', 'year': 2008},
{'id': 2, 'title': 'The Pragmatic Programmer', 'author': 'David Thomas', 'year': 1999},
]
next_id = 3
@app.route('/api/books', methods=['GET'])
def get_books():
"""List all books."""
return jsonify({
'books': books,
'total': len(books)
}), 200
@app.route('/api/books/<int:book_id>', methods=['GET'])
def get_book(book_id):
"""Get a single book by ID."""
book = next((b for b in books if b['id'] == book_id), None)
if book is None:
return jsonify({'error': 'Book not found'}), 404
return jsonify(book), 200
@app.route('/api/books', methods=['POST'])
def create_book():
"""Create a new book."""
global next_id
if not request.is_json:
return jsonify({'error': 'Content-Type must be application/json'}), 415
data = request.get_json()
# Validate required fields
required_fields = ['title', 'author']
missing = [f for f in required_fields if f not in data]
if missing:
return jsonify({
'error': 'Missing required fields',
'missing_fields': missing
}), 400
book = {
'id': next_id,
'title': data['title'],
'author': data['author'],
'year': data.get('year') # Optional field
}
next_id += 1
books.append(book)
return jsonify(book), 201
@app.route('/api/books/<int:book_id>', methods=['PUT'])
def update_book(book_id):
"""Update an existing book."""
book = next((b for b in books if b['id'] == book_id), None)
if book is None:
return jsonify({'error': 'Book not found'}), 404
if not request.is_json:
return jsonify({'error': 'Content-Type must be application/json'}), 415
data = request.get_json()
book['title'] = data.get('title', book['title'])
book['author'] = data.get('author', book['author'])
book['year'] = data.get('year', book['year'])
return jsonify(book), 200
@app.route('/api/books/<int:book_id>', methods=['DELETE'])
def delete_book(book_id):
"""Delete a book."""
global books
original_length = len(books)
books = [b for b in books if b['id'] != book_id]
if len(books) == original_length:
return jsonify({'error': 'Book not found'}), 404
return '', 204
if __name__ == '__main__':
app.run(debug=True)
Test each operation with curl:
# List all books
curl http://localhost:5000/api/books
# Get a single book
curl http://localhost:5000/api/books/1
# Create a new book
curl -X POST http://localhost:5000/api/books \
-H "Content-Type: application/json" \
-d '{"title": "Design Patterns", "author": "Gang of Four", "year": 1994}'
# Update a book
curl -X PUT http://localhost:5000/api/books/1 \
-H "Content-Type: application/json" \
-d '{"title": "Clean Code (Revised)"}'
# Delete a book
curl -X DELETE http://localhost:5000/api/books/2
Notice the patterns: POST returns 201 Created with the new resource in the body, DELETE returns 204 No Content with an empty body, and every error returns a JSON object with an error key.
Flask provides several ways to access incoming data. Understanding when to use each one prevents subtle bugs.
request.get_json()@app.route('/api/books', methods=['POST'])
def create_book():
# request.get_json() returns None if Content-Type is not application/json
data = request.get_json()
if data is None:
return jsonify({'error': 'Request body must be JSON'}), 400
# request.get_json(silent=True) suppresses parsing errors
# request.get_json(force=True) ignores Content-Type header
title = data.get('title') # Safe — returns None if missing
# title = data['title'] # Dangerous — raises KeyError if missing
request.args@app.route('/api/books', methods=['GET'])
def get_books():
# GET /api/books?page=2&per_page=10&sort=title
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 20, type=int)
sort_by = request.args.get('sort', 'id')
# The type parameter handles conversion and returns the default
# if conversion fails — no try/except needed
# GET /api/books?page=abc -> page = 1 (default)
# Flask converters: int, float, string (default), path, uuid
@app.route('/api/books/<int:book_id>')
def get_book(book_id):
# book_id is already an int — Flask returns 404 if conversion fails
pass
@app.route('/api/users/<uuid:user_id>')
def get_user(user_id):
# user_id is a UUID object
pass
def validate_book_data(data, required=True):
"""Validate book input data. Returns (cleaned_data, errors)."""
errors = {}
if required and 'title' not in data:
errors['title'] = 'Title is required'
elif 'title' in data:
if not isinstance(data['title'], str):
errors['title'] = 'Title must be a string'
elif len(data['title'].strip()) == 0:
errors['title'] = 'Title cannot be empty'
elif len(data['title']) > 200:
errors['title'] = 'Title must be 200 characters or fewer'
if 'year' in data:
if not isinstance(data['year'], int):
errors['year'] = 'Year must be an integer'
elif data['year'] < 0 or data['year'] > 2030:
errors['year'] = 'Year must be between 0 and 2030'
if errors:
return None, errors
cleaned = {
'title': data.get('title', '').strip(),
'author': data.get('author', '').strip(),
'year': data.get('year'),
}
return cleaned, None
@app.route('/api/books', methods=['POST'])
def create_book():
data = request.get_json()
if data is None:
return jsonify({'error': 'Request body must be JSON'}), 400
cleaned, errors = validate_book_data(data, required=True)
if errors:
return jsonify({
'error': 'Validation failed',
'details': errors
}), 422
# Proceed with cleaned data
# ...
The validation function returns a tuple of cleaned data and errors. This pattern separates validation from endpoint logic and makes it easy to reuse across POST and PUT handlers. We will replace this manual validation with marshmallow schemas later, but understanding the manual approach first is important.
Consistent response formatting is one of the hallmarks of a well-designed API. Your clients should be able to predict the shape of every response without reading documentation for each endpoint.
from flask import Flask, jsonify
from functools import wraps
app = Flask(__name__)
def api_response(data=None, message=None, status_code=200, **kwargs):
"""Build a consistent API response."""
response = {
'success': 200 <= status_code < 300,
}
if message:
response['message'] = message
if data is not None:
response['data'] = data
# Include any extra kwargs (e.g., pagination metadata)
response.update(kwargs)
return jsonify(response), status_code
# Usage in endpoints
@app.route('/api/books', methods=['GET'])
def get_books():
books = get_all_books()
return api_response(data=books, message='Books retrieved successfully')
@app.route('/api/books', methods=['POST'])
def create_book():
# ... validation ...
book = save_book(data)
return api_response(data=book, message='Book created', status_code=201)
@app.route('/api/books/<int:book_id>', methods=['DELETE'])
def delete_book(book_id):
# ... deletion logic ...
return api_response(message='Book deleted', status_code=200)
def error_response(message, status_code, details=None):
"""Build a consistent error response."""
response = {
'success': False,
'error': {
'message': message,
'code': status_code,
}
}
if details:
response['error']['details'] = details
return jsonify(response), status_code
# Usage
@app.route('/api/books/<int:book_id>', methods=['GET'])
def get_book(book_id):
book = find_book(book_id)
if not book:
return error_response('Book not found', 404)
return api_response(data=book)
@app.route('/api/books', methods=['GET'])
def get_books():
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 20, type=int)
# Clamp per_page to prevent abuse
per_page = min(per_page, 100)
# With SQLAlchemy (shown later):
pagination = Book.query.paginate(page=page, per_page=per_page, error_out=False)
return api_response(
data=[book.to_dict() for book in pagination.items],
pagination={
'page': pagination.page,
'per_page': pagination.per_page,
'total_pages': pagination.pages,
'total_items': pagination.total,
'has_next': pagination.has_next,
'has_prev': pagination.has_prev,
}
)
The response for a paginated request looks like this:
{
"success": true,
"data": [
{"id": 1, "title": "Clean Code", "author": "Robert C. Martin"},
{"id": 2, "title": "The Pragmatic Programmer", "author": "David Thomas"}
],
"pagination": {
"page": 1,
"per_page": 20,
"total_pages": 5,
"total_items": 93,
"has_next": true,
"has_prev": false
}
}
When your endpoints handle multiple HTTP methods, function-based views can get cluttered. Flask's MethodView organizes CRUD operations into a single class where each HTTP method gets its own method:
from flask import Flask, jsonify, request
from flask.views import MethodView
app = Flask(__name__)
class BookAPI(MethodView):
"""Handles /api/books and /api/books/<id>"""
def get(self, book_id=None):
if book_id is None:
# GET /api/books — list all
books = get_all_books()
return jsonify({'books': books}), 200
else:
# GET /api/books/42 — get one
book = find_book(book_id)
if not book:
return jsonify({'error': 'Book not found'}), 404
return jsonify(book), 200
def post(self):
"""POST /api/books — create"""
data = request.get_json()
if not data or 'title' not in data:
return jsonify({'error': 'Title is required'}), 400
book = create_book(data)
return jsonify(book), 201
def put(self, book_id):
"""PUT /api/books/42 — update"""
book = find_book(book_id)
if not book:
return jsonify({'error': 'Book not found'}), 404
data = request.get_json()
updated = update_book(book_id, data)
return jsonify(updated), 200
def delete(self, book_id):
"""DELETE /api/books/42 — delete"""
book = find_book(book_id)
if not book:
return jsonify({'error': 'Book not found'}), 404
delete_book(book_id)
return '', 204
# Register the view with URL rules
book_view = BookAPI.as_view('book_api')
app.add_url_rule('/api/books', defaults={'book_id': None},
view_func=book_view, methods=['GET'])
app.add_url_rule('/api/books', view_func=book_view, methods=['POST'])
app.add_url_rule('/api/books/<int:book_id>', view_func=book_view,
methods=['GET', 'PUT', 'DELETE'])
The advantage of MethodView is organization. All operations for a resource live in one class. When you add decorators like authentication, you apply them once instead of to five separate functions. The registration at the bottom is more verbose, but it is done once and is explicit about which methods are allowed on which URL patterns.
By default, Flask returns HTML error pages. For an API, every response — including errors — must be JSON. Register custom error handlers to enforce this:
from flask import Flask, jsonify
from werkzeug.exceptions import HTTPException
app = Flask(__name__)
# Custom exception class for API errors
class APIError(Exception):
"""Custom exception for API-specific errors."""
def __init__(self, message, status_code=400, details=None):
super().__init__()
self.message = message
self.status_code = status_code
self.details = details
def to_dict(self):
response = {
'success': False,
'error': {
'message': self.message,
'code': self.status_code,
}
}
if self.details:
response['error']['details'] = self.details
return response
@app.errorhandler(APIError)
def handle_api_error(error):
"""Handle custom API errors."""
return jsonify(error.to_dict()), error.status_code
@app.errorhandler(HTTPException)
def handle_http_error(error):
"""Convert all HTTP exceptions to JSON."""
return jsonify({
'success': False,
'error': {
'message': error.description,
'code': error.code,
}
}), error.code
@app.errorhandler(Exception)
def handle_unexpected_error(error):
"""Catch-all for unhandled exceptions."""
app.logger.error(f'Unhandled exception: {error}', exc_info=True)
return jsonify({
'success': False,
'error': {
'message': 'An unexpected error occurred',
'code': 500,
}
}), 500
# Usage in endpoints
@app.route('/api/books/<int:book_id>', methods=['GET'])
def get_book(book_id):
book = find_book(book_id)
if not book:
raise APIError('Book not found', status_code=404)
return jsonify(book), 200
@app.route('/api/books', methods=['POST'])
def create_book():
data = request.get_json()
errors = validate(data)
if errors:
raise APIError(
'Validation failed',
status_code=422,
details=errors
)
The three-layer approach is important: APIError handles your intentional errors, HTTPException catches Flask's built-in errors (like 404 for unknown routes or 405 for wrong methods), and the generic Exception handler prevents stack traces from leaking to clients in production. Always log the actual error server-side before returning a generic message.
APIs need to verify who is making requests. We will cover two approaches: API keys for simple service-to-service authentication, and JWT tokens for user authentication.
from functools import wraps
from flask import request, jsonify
API_KEYS = {
'sk-abc123': {'name': 'Mobile App', 'permissions': ['read', 'write']},
'sk-def456': {'name': 'Analytics Service', 'permissions': ['read']},
}
def require_api_key(f):
@wraps(f)
def decorated(*args, **kwargs):
api_key = request.headers.get('X-API-Key')
if not api_key:
return jsonify({'error': 'API key is required'}), 401
if api_key not in API_KEYS:
return jsonify({'error': 'Invalid API key'}), 401
# Attach client info to the request context
request.api_client = API_KEYS[api_key]
return f(*args, **kwargs)
return decorated
def require_permission(permission):
def decorator(f):
@wraps(f)
def decorated(*args, **kwargs):
if permission not in request.api_client.get('permissions', []):
return jsonify({'error': 'Insufficient permissions'}), 403
return f(*args, **kwargs)
return decorated
return decorator
@app.route('/api/books', methods=['GET'])
@require_api_key
@require_permission('read')
def get_books():
return jsonify({'books': books})
JWT (JSON Web Token) is the standard for stateless user authentication in REST APIs. Flask-JWT-Extended makes this straightforward:
from flask import Flask, jsonify, request
from flask_jwt_extended import (
JWTManager, create_access_token, create_refresh_token,
jwt_required, get_jwt_identity, get_jwt
)
from werkzeug.security import generate_password_hash, check_password_hash
from datetime import timedelta
app = Flask(__name__)
# JWT Configuration
app.config['JWT_SECRET_KEY'] = 'your-secret-key-change-in-production'
app.config['JWT_ACCESS_TOKEN_EXPIRES'] = timedelta(hours=1)
app.config['JWT_REFRESH_TOKEN_EXPIRES'] = timedelta(days=30)
jwt = JWTManager(app)
# In-memory user store (use a database in production)
users = {}
@app.route('/api/auth/register', methods=['POST'])
def register():
data = request.get_json()
if not data or not data.get('email') or not data.get('password'):
return jsonify({'error': 'Email and password are required'}), 400
if data['email'] in users:
return jsonify({'error': 'User already exists'}), 409
users[data['email']] = {
'email': data['email'],
'password': generate_password_hash(data['password']),
'role': 'user',
}
return jsonify({'message': 'User registered successfully'}), 201
@app.route('/api/auth/login', methods=['POST'])
def login():
data = request.get_json()
if not data or not data.get('email') or not data.get('password'):
return jsonify({'error': 'Email and password are required'}), 400
user = users.get(data['email'])
if not user or not check_password_hash(user['password'], data['password']):
return jsonify({'error': 'Invalid email or password'}), 401
# Create tokens with user identity and additional claims
access_token = create_access_token(
identity=data['email'],
additional_claims={'role': user['role']}
)
refresh_token = create_refresh_token(identity=data['email'])
return jsonify({
'access_token': access_token,
'refresh_token': refresh_token,
'token_type': 'Bearer',
}), 200
@app.route('/api/auth/refresh', methods=['POST'])
@jwt_required(refresh=True)
def refresh():
"""Get a new access token using a refresh token."""
identity = get_jwt_identity()
access_token = create_access_token(identity=identity)
return jsonify({'access_token': access_token}), 200
@app.route('/api/books', methods=['POST'])
@jwt_required()
def create_book():
"""Protected endpoint — requires a valid JWT."""
current_user = get_jwt_identity()
claims = get_jwt()
# Access role from token claims
if claims.get('role') != 'admin':
return jsonify({'error': 'Admin access required'}), 403
data = request.get_json()
# ... create book ...
return jsonify({'message': 'Book created', 'created_by': current_user}), 201
# Custom JWT error handlers
@jwt.expired_token_loader
def expired_token_callback(jwt_header, jwt_payload):
return jsonify({
'error': 'Token has expired',
'code': 'token_expired'
}), 401
@jwt.invalid_token_loader
def invalid_token_callback(error):
return jsonify({
'error': 'Invalid token',
'code': 'token_invalid'
}), 401
@jwt.unauthorized_loader
def missing_token_callback(error):
return jsonify({
'error': 'Authorization token is required',
'code': 'token_missing'
}), 401
Test the JWT flow:
# Register
curl -X POST http://localhost:5000/api/auth/register \
-H "Content-Type: application/json" \
-d '{"email": "admin@example.com", "password": "securepassword"}'
# Login — get tokens
curl -X POST http://localhost:5000/api/auth/login \
-H "Content-Type: application/json" \
-d '{"email": "admin@example.com", "password": "securepassword"}'
# Use the access token
curl -X POST http://localhost:5000/api/books \
-H "Content-Type: application/json" \
-H "Authorization: Bearer eyJhbGciOiJIUzI1NiIs..." \
-d '{"title": "New Book", "author": "Author Name"}'
# Refresh the token when it expires
curl -X POST http://localhost:5000/api/auth/refresh \
-H "Authorization: Bearer <refresh_token>"
An in-memory list is fine for learning, but real APIs need a database. Flask-SQLAlchemy provides an elegant ORM layer:
app/extensions.py)from flask_sqlalchemy import SQLAlchemy from flask_marshmallow import Marshmallow from flask_jwt_extended import JWTManager from flask_cors import CORS db = SQLAlchemy() ma = Marshmallow() jwt = JWTManager() cors = CORS()
app/config.py)import os
class Config:
SECRET_KEY = os.environ.get('SECRET_KEY', 'dev-secret-key')
JWT_SECRET_KEY = os.environ.get('JWT_SECRET_KEY', 'jwt-dev-secret')
SQLALCHEMY_TRACK_MODIFICATIONS = False
class DevelopmentConfig(Config):
DEBUG = True
SQLALCHEMY_DATABASE_URI = 'sqlite:///dev.db'
class TestingConfig(Config):
TESTING = True
SQLALCHEMY_DATABASE_URI = 'sqlite:///:memory:'
class ProductionConfig(Config):
SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL')
config = {
'development': DevelopmentConfig,
'testing': TestingConfig,
'production': ProductionConfig,
'default': DevelopmentConfig,
}
app/__init__.py)from flask import Flask
from app.config import config
from app.extensions import db, ma, jwt, cors
def create_app(config_name='default'):
app = Flask(__name__)
app.config.from_object(config[config_name])
# Initialize extensions
db.init_app(app)
ma.init_app(app)
jwt.init_app(app)
cors.init_app(app)
# Register blueprints
from app.routes.books import books_bp
from app.routes.auth import auth_bp
app.register_blueprint(books_bp, url_prefix='/api')
app.register_blueprint(auth_bp, url_prefix='/api/auth')
# Register error handlers
from app.errors import register_error_handlers
register_error_handlers(app)
# Create tables
with app.app_context():
db.create_all()
return app
app/models.py)from datetime import datetime
from app.extensions import db
from werkzeug.security import generate_password_hash, check_password_hash
class Book(db.Model):
__tablename__ = 'books'
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(200), nullable=False)
author = db.Column(db.String(100), nullable=False)
isbn = db.Column(db.String(13), unique=True, nullable=True)
year = db.Column(db.Integer, nullable=True)
description = db.Column(db.Text, nullable=True)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
def __repr__(self):
return f'<Book {self.title}>'
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
email = db.Column(db.String(120), unique=True, nullable=False, index=True)
password_hash = db.Column(db.String(256), nullable=False)
role = db.Column(db.String(20), default='user')
created_at = db.Column(db.DateTime, default=datetime.utcnow)
def set_password(self, password):
self.password_hash = generate_password_hash(password)
def check_password(self, password):
return check_password_hash(self.password_hash, password)
def __repr__(self):
return f'<User {self.email}>'
from flask import Blueprint, jsonify, request
from app.extensions import db
from app.models import Book
books_bp = Blueprint('books', __name__)
@books_bp.route('/books', methods=['GET'])
def get_books():
page = request.args.get('page', 1, type=int)
per_page = min(request.args.get('per_page', 20, type=int), 100)
# Filter by author if provided
query = Book.query
author = request.args.get('author')
if author:
query = query.filter(Book.author.ilike(f'%{author}%'))
# Sort
sort = request.args.get('sort', 'created_at')
order = request.args.get('order', 'desc')
if hasattr(Book, sort):
sort_column = getattr(Book, sort)
query = query.order_by(sort_column.desc() if order == 'desc' else sort_column.asc())
pagination = query.paginate(page=page, per_page=per_page, error_out=False)
return jsonify({
'success': True,
'data': [book_to_dict(b) for b in pagination.items],
'pagination': {
'page': pagination.page,
'per_page': pagination.per_page,
'total_pages': pagination.pages,
'total_items': pagination.total,
}
}), 200
@books_bp.route('/books/<int:book_id>', methods=['GET'])
def get_book(book_id):
book = db.session.get(Book, book_id)
if not book:
return jsonify({'error': 'Book not found'}), 404
return jsonify({'success': True, 'data': book_to_dict(book)}), 200
@books_bp.route('/books', methods=['POST'])
def create_book():
data = request.get_json()
if not data:
return jsonify({'error': 'Request body must be JSON'}), 400
if not data.get('title') or not data.get('author'):
return jsonify({'error': 'Title and author are required'}), 400
# Check for duplicate ISBN
if data.get('isbn'):
existing = Book.query.filter_by(isbn=data['isbn']).first()
if existing:
return jsonify({'error': 'A book with this ISBN already exists'}), 409
book = Book(
title=data['title'],
author=data['author'],
isbn=data.get('isbn'),
year=data.get('year'),
description=data.get('description'),
)
db.session.add(book)
db.session.commit()
return jsonify({'success': True, 'data': book_to_dict(book)}), 201
@books_bp.route('/books/<int:book_id>', methods=['PUT'])
def update_book(book_id):
book = db.session.get(Book, book_id)
if not book:
return jsonify({'error': 'Book not found'}), 404
data = request.get_json()
if not data:
return jsonify({'error': 'Request body must be JSON'}), 400
book.title = data.get('title', book.title)
book.author = data.get('author', book.author)
book.isbn = data.get('isbn', book.isbn)
book.year = data.get('year', book.year)
book.description = data.get('description', book.description)
db.session.commit()
return jsonify({'success': True, 'data': book_to_dict(book)}), 200
@books_bp.route('/books/<int:book_id>', methods=['DELETE'])
def delete_book(book_id):
book = db.session.get(Book, book_id)
if not book:
return jsonify({'error': 'Book not found'}), 404
db.session.delete(book)
db.session.commit()
return jsonify({'success': True, 'message': 'Book deleted'}), 200
def book_to_dict(book):
return {
'id': book.id,
'title': book.title,
'author': book.author,
'isbn': book.isbn,
'year': book.year,
'description': book.description,
'created_at': book.created_at.isoformat(),
'updated_at': book.updated_at.isoformat(),
}
Manual to_dict() methods work, but they do not validate input. Marshmallow handles both serialization (object to JSON) and deserialization (JSON to validated data) in a single schema definition:
from marshmallow import fields, validate, validates, ValidationError
from app.extensions import ma
from app.models import Book
class BookSchema(ma.SQLAlchemyAutoSchema):
class Meta:
model = Book
load_instance = True # Deserialize to model instances
include_fk = True
# Override auto-generated fields with validation rules
title = fields.String(
required=True,
validate=validate.Length(min=1, max=200, error='Title must be 1-200 characters')
)
author = fields.String(
required=True,
validate=validate.Length(min=1, max=100)
)
isbn = fields.String(
validate=validate.Length(equal=13, error='ISBN must be exactly 13 characters')
)
year = fields.Integer(
validate=validate.Range(min=0, max=2030)
)
description = fields.String(validate=validate.Length(max=2000))
# Read-only fields — included in output but ignored on input
created_at = fields.DateTime(dump_only=True)
updated_at = fields.DateTime(dump_only=True)
@validates('isbn')
def validate_isbn(self, value):
if value and not value.isdigit():
raise ValidationError('ISBN must contain only digits')
# Schema instances
book_schema = BookSchema()
books_schema = BookSchema(many=True)
# Usage in endpoints
@books_bp.route('/books', methods=['POST'])
def create_book():
data = request.get_json()
if not data:
return jsonify({'error': 'Request body must be JSON'}), 400
try:
book = book_schema.load(data, session=db.session)
except ValidationError as err:
return jsonify({
'error': 'Validation failed',
'details': err.messages
}), 422
db.session.add(book)
db.session.commit()
return jsonify({
'success': True,
'data': book_schema.dump(book)
}), 201
@books_bp.route('/books', methods=['GET'])
def get_books():
page = request.args.get('page', 1, type=int)
per_page = min(request.args.get('per_page', 20, type=int), 100)
pagination = Book.query.paginate(page=page, per_page=per_page, error_out=False)
return jsonify({
'success': True,
'data': books_schema.dump(pagination.items),
'pagination': {
'page': pagination.page,
'per_page': pagination.per_page,
'total_pages': pagination.pages,
'total_items': pagination.total,
}
}), 200
@books_bp.route('/books/<int:book_id>', methods=['PUT'])
def update_book(book_id):
book = db.session.get(Book, book_id)
if not book:
return jsonify({'error': 'Book not found'}), 404
data = request.get_json()
try:
# partial=True allows updating only some fields
updated_book = book_schema.load(data, instance=book, partial=True, session=db.session)
except ValidationError as err:
return jsonify({
'error': 'Validation failed',
'details': err.messages
}), 422
db.session.commit()
return jsonify({
'success': True,
'data': book_schema.dump(updated_book)
}), 200
Marshmallow gives you three critical capabilities in one place: it validates incoming data and returns structured error messages, it serializes model instances to dictionaries (and then JSON), and it deserializes JSON input directly into SQLAlchemy model instances. The partial=True parameter on PUT/PATCH is particularly useful — it makes all fields optional so clients only need to send the fields they want to update.
If your API is consumed by a frontend on a different domain, browsers will block requests unless you configure CORS headers. Flask-CORS makes this trivial:
from flask import Flask
from flask_cors import CORS
app = Flask(__name__)
# Option 1: Allow all origins (development only)
CORS(app)
# Option 2: Restrict to specific origins (production)
CORS(app, resources={
r"/api/*": {
"origins": [
"https://yourfrontend.com",
"https://admin.yourfrontend.com",
],
"methods": ["GET", "POST", "PUT", "DELETE", "OPTIONS"],
"allow_headers": ["Content-Type", "Authorization"],
"expose_headers": ["X-Total-Count"],
"max_age": 600, # Cache preflight for 10 minutes
}
})
# Option 3: Per-blueprint CORS
from flask_cors import cross_origin
@app.route('/api/public/data', methods=['GET'])
@cross_origin(origins='*')
def public_data():
return jsonify({'data': 'Anyone can access this'})
The key settings to understand:
origins — Which domains can make requests. Never use '*' in production if your API requires authentication.methods — Which HTTP methods are allowed.allow_headers — Which request headers the client can send. You must include Authorization if you use JWT tokens.expose_headers — Which response headers the client's JavaScript can read. Custom headers like pagination totals need to be explicitly exposed.max_age — How long the browser caches the preflight (OPTIONS) response. Higher values reduce preflight requests.Let us assemble everything into a complete, runnable application. This brings together the application factory, database models, marshmallow schemas, JWT authentication, error handling, CORS, and pagination.
run.py)from app import create_app
app = create_app('development')
if __name__ == '__main__':
app.run(debug=True, port=5000)
app/extensions.py)from flask_sqlalchemy import SQLAlchemy from flask_marshmallow import Marshmallow from flask_jwt_extended import JWTManager from flask_cors import CORS db = SQLAlchemy() ma = Marshmallow() jwt = JWTManager() cors = CORS()
app/__init__.py)from flask import Flask
from app.config import config
from app.extensions import db, ma, jwt, cors
def create_app(config_name='default'):
app = Flask(__name__)
app.config.from_object(config[config_name])
# Initialize extensions
db.init_app(app)
ma.init_app(app)
jwt.init_app(app)
cors.init_app(app, resources={r"/api/*": {"origins": "*"}})
# Register blueprints
from app.routes.books import books_bp
from app.routes.auth import auth_bp
app.register_blueprint(books_bp, url_prefix='/api')
app.register_blueprint(auth_bp, url_prefix='/api/auth')
# Register error handlers
from app.errors import register_error_handlers
register_error_handlers(app)
# Create database tables
with app.app_context():
db.create_all()
return app
app/models.py)from datetime import datetime
from app.extensions import db
from werkzeug.security import generate_password_hash, check_password_hash
class Book(db.Model):
__tablename__ = 'books'
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(200), nullable=False)
author = db.Column(db.String(100), nullable=False)
isbn = db.Column(db.String(13), unique=True, nullable=True)
year = db.Column(db.Integer, nullable=True)
description = db.Column(db.Text, nullable=True)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
def __repr__(self):
return f'<Book {self.title}>'
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
email = db.Column(db.String(120), unique=True, nullable=False, index=True)
password_hash = db.Column(db.String(256), nullable=False)
role = db.Column(db.String(20), default='user')
created_at = db.Column(db.DateTime, default=datetime.utcnow)
def set_password(self, password):
self.password_hash = generate_password_hash(password)
def check_password(self, password):
return check_password_hash(self.password_hash, password)
def __repr__(self):
return f'<User {self.email}>'
app/schemas.py)from marshmallow import fields, validate, validates, ValidationError
from app.extensions import ma
from app.models import Book, User
class BookSchema(ma.SQLAlchemyAutoSchema):
class Meta:
model = Book
load_instance = True
include_fk = True
title = fields.String(required=True, validate=validate.Length(min=1, max=200))
author = fields.String(required=True, validate=validate.Length(min=1, max=100))
isbn = fields.String(validate=validate.Length(equal=13))
year = fields.Integer(validate=validate.Range(min=0, max=2030))
description = fields.String(validate=validate.Length(max=2000))
created_at = fields.DateTime(dump_only=True)
updated_at = fields.DateTime(dump_only=True)
@validates('isbn')
def validate_isbn(self, value):
if value and not value.isdigit():
raise ValidationError('ISBN must contain only digits')
class UserSchema(ma.SQLAlchemyAutoSchema):
class Meta:
model = User
load_instance = True
exclude = ('password_hash',)
email = fields.Email(required=True)
role = fields.String(dump_only=True)
created_at = fields.DateTime(dump_only=True)
class LoginSchema(ma.Schema):
email = fields.Email(required=True)
password = fields.String(required=True, validate=validate.Length(min=6))
class RegisterSchema(ma.Schema):
email = fields.Email(required=True)
password = fields.String(required=True, validate=validate.Length(min=8))
book_schema = BookSchema()
books_schema = BookSchema(many=True)
user_schema = UserSchema()
login_schema = LoginSchema()
register_schema = RegisterSchema()
app/errors.py)from flask import jsonify
from werkzeug.exceptions import HTTPException
class APIError(Exception):
def __init__(self, message, status_code=400, details=None):
super().__init__()
self.message = message
self.status_code = status_code
self.details = details
def to_dict(self):
response = {
'success': False,
'error': {'message': self.message, 'code': self.status_code}
}
if self.details:
response['error']['details'] = self.details
return response
def register_error_handlers(app):
@app.errorhandler(APIError)
def handle_api_error(error):
return jsonify(error.to_dict()), error.status_code
@app.errorhandler(HTTPException)
def handle_http_error(error):
return jsonify({
'success': False,
'error': {'message': error.description, 'code': error.code}
}), error.code
@app.errorhandler(Exception)
def handle_unexpected_error(error):
app.logger.error(f'Unhandled exception: {error}', exc_info=True)
return jsonify({
'success': False,
'error': {'message': 'An unexpected error occurred', 'code': 500}
}), 500
app/routes/auth.py)from flask import Blueprint, jsonify, request
from flask_jwt_extended import (
create_access_token, create_refresh_token,
jwt_required, get_jwt_identity
)
from marshmallow import ValidationError
from app.extensions import db
from app.models import User
from app.schemas import login_schema, register_schema, user_schema
auth_bp = Blueprint('auth', __name__)
@auth_bp.route('/register', methods=['POST'])
def register():
data = request.get_json()
if not data:
return jsonify({'error': 'Request body must be JSON'}), 400
try:
validated = register_schema.load(data)
except ValidationError as err:
return jsonify({'error': 'Validation failed', 'details': err.messages}), 422
if User.query.filter_by(email=validated['email']).first():
return jsonify({'error': 'Email already registered'}), 409
user = User(email=validated['email'])
user.set_password(validated['password'])
db.session.add(user)
db.session.commit()
return jsonify({
'success': True,
'message': 'User registered successfully',
'data': user_schema.dump(user)
}), 201
@auth_bp.route('/login', methods=['POST'])
def login():
data = request.get_json()
if not data:
return jsonify({'error': 'Request body must be JSON'}), 400
try:
validated = login_schema.load(data)
except ValidationError as err:
return jsonify({'error': 'Validation failed', 'details': err.messages}), 422
user = User.query.filter_by(email=validated['email']).first()
if not user or not user.check_password(validated['password']):
return jsonify({'error': 'Invalid email or password'}), 401
access_token = create_access_token(
identity=str(user.id),
additional_claims={'role': user.role, 'email': user.email}
)
refresh_token = create_refresh_token(identity=str(user.id))
return jsonify({
'success': True,
'data': {
'access_token': access_token,
'refresh_token': refresh_token,
'token_type': 'Bearer',
}
}), 200
@auth_bp.route('/refresh', methods=['POST'])
@jwt_required(refresh=True)
def refresh():
identity = get_jwt_identity()
access_token = create_access_token(identity=identity)
return jsonify({
'success': True,
'data': {'access_token': access_token}
}), 200
@auth_bp.route('/me', methods=['GET'])
@jwt_required()
def get_current_user():
user_id = get_jwt_identity()
user = db.session.get(User, int(user_id))
if not user:
return jsonify({'error': 'User not found'}), 404
return jsonify({
'success': True,
'data': user_schema.dump(user)
}), 200
app/routes/books.py)from flask import Blueprint, jsonify, request
from flask_jwt_extended import jwt_required, get_jwt
from marshmallow import ValidationError as MarshmallowValidationError
from app.extensions import db
from app.models import Book
from app.schemas import book_schema, books_schema
from app.errors import APIError
books_bp = Blueprint('books', __name__)
def admin_required(fn):
"""Decorator that requires admin role."""
from functools import wraps
@wraps(fn)
@jwt_required()
def wrapper(*args, **kwargs):
claims = get_jwt()
if claims.get('role') != 'admin':
raise APIError('Admin access required', 403)
return fn(*args, **kwargs)
return wrapper
@books_bp.route('/books', methods=['GET'])
def list_books():
"""List books with filtering, sorting, and pagination."""
page = request.args.get('page', 1, type=int)
per_page = min(request.args.get('per_page', 20, type=int), 100)
query = Book.query
# Filter by author
author = request.args.get('author')
if author:
query = query.filter(Book.author.ilike(f'%{author}%'))
# Filter by year
year = request.args.get('year', type=int)
if year:
query = query.filter_by(year=year)
# Search by title
search = request.args.get('search')
if search:
query = query.filter(Book.title.ilike(f'%{search}%'))
# Sort
sort = request.args.get('sort', 'created_at')
order = request.args.get('order', 'desc')
allowed_sorts = ['title', 'author', 'year', 'created_at']
if sort in allowed_sorts:
sort_col = getattr(Book, sort)
query = query.order_by(sort_col.desc() if order == 'desc' else sort_col.asc())
pagination = query.paginate(page=page, per_page=per_page, error_out=False)
return jsonify({
'success': True,
'data': books_schema.dump(pagination.items),
'pagination': {
'page': pagination.page,
'per_page': pagination.per_page,
'total_pages': pagination.pages,
'total_items': pagination.total,
'has_next': pagination.has_next,
'has_prev': pagination.has_prev,
}
}), 200
@books_bp.route('/books/<int:book_id>', methods=['GET'])
def get_book(book_id):
"""Get a single book by ID."""
book = db.session.get(Book, book_id)
if not book:
raise APIError('Book not found', 404)
return jsonify({'success': True, 'data': book_schema.dump(book)}), 200
@books_bp.route('/books', methods=['POST'])
@jwt_required()
def create_book():
"""Create a new book. Requires authentication."""
data = request.get_json()
if not data:
raise APIError('Request body must be JSON', 400)
try:
book = book_schema.load(data, session=db.session)
except MarshmallowValidationError as err:
raise APIError('Validation failed', 422, details=err.messages)
# Check for duplicate ISBN
if book.isbn:
existing = Book.query.filter_by(isbn=book.isbn).first()
if existing:
raise APIError('A book with this ISBN already exists', 409)
db.session.add(book)
db.session.commit()
return jsonify({'success': True, 'data': book_schema.dump(book)}), 201
@books_bp.route('/books/<int:book_id>', methods=['PUT'])
@jwt_required()
def update_book(book_id):
"""Update a book. Requires authentication."""
book = db.session.get(Book, book_id)
if not book:
raise APIError('Book not found', 404)
data = request.get_json()
if not data:
raise APIError('Request body must be JSON', 400)
try:
updated_book = book_schema.load(
data, instance=book, partial=True, session=db.session
)
except MarshmallowValidationError as err:
raise APIError('Validation failed', 422, details=err.messages)
db.session.commit()
return jsonify({'success': True, 'data': book_schema.dump(updated_book)}), 200
@books_bp.route('/books/<int:book_id>', methods=['DELETE'])
@admin_required
def delete_book(book_id):
"""Delete a book. Requires admin role."""
book = db.session.get(Book, book_id)
if not book:
raise APIError('Book not found', 404)
db.session.delete(book)
db.session.commit()
return jsonify({'success': True, 'message': 'Book deleted'}), 200
Testing is non-negotiable for API development. Flask provides a test client that simulates HTTP requests without starting a server. Combined with pytest, you get fast, isolated tests.
tests/conftest.py)import pytest
from app import create_app
from app.extensions import db as _db
from app.models import User
@pytest.fixture(scope='session')
def app():
"""Create application for testing."""
app = create_app('testing')
return app
@pytest.fixture(scope='function')
def db(app):
"""Create a fresh database for each test."""
with app.app_context():
_db.create_all()
yield _db
_db.session.rollback()
_db.drop_all()
@pytest.fixture
def client(app, db):
"""Create a test client."""
return app.test_client()
@pytest.fixture
def auth_headers(client, db):
"""Register and login a user, return auth headers."""
# Register
client.post('/api/auth/register', json={
'email': 'test@example.com',
'password': 'testpassword123',
})
# Login
response = client.post('/api/auth/login', json={
'email': 'test@example.com',
'password': 'testpassword123',
})
token = response.get_json()['data']['access_token']
return {'Authorization': f'Bearer {token}'}
@pytest.fixture
def admin_headers(client, db, app):
"""Create an admin user and return auth headers."""
with app.app_context():
admin = User(email='admin@example.com', role='admin')
admin.set_password('adminpassword123')
_db.session.add(admin)
_db.session.commit()
response = client.post('/api/auth/login', json={
'email': 'admin@example.com',
'password': 'adminpassword123',
})
token = response.get_json()['data']['access_token']
return {'Authorization': f'Bearer {token}'}
tests/test_books.py)import pytest
class TestListBooks:
def test_list_books_empty(self, client):
response = client.get('/api/books')
assert response.status_code == 200
data = response.get_json()
assert data['success'] is True
assert data['data'] == []
assert data['pagination']['total_items'] == 0
def test_list_books_with_pagination(self, client, auth_headers):
# Create 25 books
for i in range(25):
client.post('/api/books', json={
'title': f'Book {i}',
'author': f'Author {i}',
}, headers=auth_headers)
# First page
response = client.get('/api/books?page=1&per_page=10')
data = response.get_json()
assert len(data['data']) == 10
assert data['pagination']['total_items'] == 25
assert data['pagination']['total_pages'] == 3
assert data['pagination']['has_next'] is True
def test_list_books_with_search(self, client, auth_headers):
client.post('/api/books', json={
'title': 'Python Crash Course',
'author': 'Eric Matthes',
}, headers=auth_headers)
client.post('/api/books', json={
'title': 'Clean Code',
'author': 'Robert Martin',
}, headers=auth_headers)
response = client.get('/api/books?search=python')
data = response.get_json()
assert len(data['data']) == 1
assert data['data'][0]['title'] == 'Python Crash Course'
class TestGetBook:
def test_get_existing_book(self, client, auth_headers):
# Create a book
create_resp = client.post('/api/books', json={
'title': 'Test Book',
'author': 'Test Author',
}, headers=auth_headers)
book_id = create_resp.get_json()['data']['id']
# Retrieve it
response = client.get(f'/api/books/{book_id}')
assert response.status_code == 200
assert response.get_json()['data']['title'] == 'Test Book'
def test_get_nonexistent_book(self, client):
response = client.get('/api/books/9999')
assert response.status_code == 404
class TestCreateBook:
def test_create_book_success(self, client, auth_headers):
response = client.post('/api/books', json={
'title': 'New Book',
'author': 'New Author',
'year': 2024,
'isbn': '9781234567890',
}, headers=auth_headers)
assert response.status_code == 201
data = response.get_json()
assert data['success'] is True
assert data['data']['title'] == 'New Book'
assert data['data']['isbn'] == '9781234567890'
def test_create_book_missing_title(self, client, auth_headers):
response = client.post('/api/books', json={
'author': 'Some Author',
}, headers=auth_headers)
assert response.status_code == 422
def test_create_book_without_auth(self, client):
response = client.post('/api/books', json={
'title': 'Unauthorized Book',
'author': 'Author',
})
assert response.status_code == 401
def test_create_book_duplicate_isbn(self, client, auth_headers):
# Create first book
client.post('/api/books', json={
'title': 'First Book',
'author': 'Author',
'isbn': '9781234567890',
}, headers=auth_headers)
# Try to create with same ISBN
response = client.post('/api/books', json={
'title': 'Second Book',
'author': 'Author',
'isbn': '9781234567890',
}, headers=auth_headers)
assert response.status_code == 409
class TestUpdateBook:
def test_update_book_partial(self, client, auth_headers):
# Create
create_resp = client.post('/api/books', json={
'title': 'Original Title',
'author': 'Original Author',
}, headers=auth_headers)
book_id = create_resp.get_json()['data']['id']
# Update only the title
response = client.put(f'/api/books/{book_id}', json={
'title': 'Updated Title',
}, headers=auth_headers)
assert response.status_code == 200
data = response.get_json()['data']
assert data['title'] == 'Updated Title'
assert data['author'] == 'Original Author' # Unchanged
class TestDeleteBook:
def test_delete_book_as_admin(self, client, auth_headers, admin_headers):
# Create as regular user
create_resp = client.post('/api/books', json={
'title': 'Delete Me',
'author': 'Author',
}, headers=auth_headers)
book_id = create_resp.get_json()['data']['id']
# Delete as admin
response = client.delete(f'/api/books/{book_id}', headers=admin_headers)
assert response.status_code == 200
# Verify deletion
get_resp = client.get(f'/api/books/{book_id}')
assert get_resp.status_code == 404
def test_delete_book_as_regular_user(self, client, auth_headers):
create_resp = client.post('/api/books', json={
'title': 'Cannot Delete',
'author': 'Author',
}, headers=auth_headers)
book_id = create_resp.get_json()['data']['id']
response = client.delete(f'/api/books/{book_id}', headers=auth_headers)
assert response.status_code == 403
Run the tests:
# Run all tests pytest tests/ -v # Run with coverage pytest tests/ -v --cov=app --cov-report=term-missing # Run a specific test class pytest tests/test_books.py::TestCreateBook -v
An API without documentation is an API nobody will use correctly. Two popular tools for Flask are flasgger (Swagger UI) and flask-smorest (OpenAPI 3.0). Here is a quick setup with flasgger:
pip install flasgger
from flask import Flask
from flasgger import Swagger
app = Flask(__name__)
# Swagger configuration
app.config['SWAGGER'] = {
'title': 'Book API',
'uiversion': 3,
'openapi': '3.0.0',
'description': 'A complete Book management REST API',
}
swagger = Swagger(app)
@app.route('/api/books', methods=['GET'])
def list_books():
"""List all books with pagination.
---
parameters:
- name: page
in: query
type: integer
default: 1
description: Page number
- name: per_page
in: query
type: integer
default: 20
description: Items per page (max 100)
- name: search
in: query
type: string
description: Search books by title
responses:
200:
description: A list of books
content:
application/json:
schema:
type: object
properties:
success:
type: boolean
data:
type: array
items:
$ref: '#/components/schemas/Book'
pagination:
$ref: '#/components/schemas/Pagination'
"""
# ... implementation ...
@app.route('/api/books', methods=['POST'])
def create_book():
"""Create a new book.
---
security:
- BearerAuth: []
requestBody:
required: true
content:
application/json:
schema:
type: object
required:
- title
- author
properties:
title:
type: string
example: "Clean Code"
author:
type: string
example: "Robert C. Martin"
isbn:
type: string
example: "9780132350884"
year:
type: integer
example: 2008
responses:
201:
description: Book created successfully
401:
description: Authentication required
422:
description: Validation error
"""
# ... implementation ...
After configuring flasgger, visit http://localhost:5000/apidocs to see the interactive Swagger UI where you can test endpoints directly from the browser. For larger projects, consider flask-smorest, which integrates more tightly with marshmallow schemas and generates OpenAPI specs automatically from your existing schema definitions.
These are the mistakes I see most often in Flask API codebases, and they tend to surface in production rather than development.
Flask returns HTML for 404, 405, and 500 errors by default. If your client expects JSON and gets HTML, parsing fails silently or throws confusing errors.
# BAD: Flask returns HTML for unknown routes
# GET /api/nonexistent -> <h1>404 Not Found</h1>
# GOOD: Register custom error handlers (as shown in section 7)
@app.errorhandler(404)
def not_found(error):
return jsonify({'error': 'Resource not found'}), 404
@app.errorhandler(405)
def method_not_allowed(error):
return jsonify({'error': 'Method not allowed'}), 405
Your API works perfectly in Postman but fails from a browser. The error message in the browser console says "blocked by CORS policy." This catches people every time.
# This is all it takes to fix it during development: from flask_cors import CORS CORS(app)
If a Book has a relationship to an Author, and you load 50 books, SQLAlchemy will execute 1 query for books + 50 individual queries for authors unless you eager-load:
# BAD: N+1 queries
books = Book.query.all()
for book in books:
print(book.author.name) # Each access triggers a query
# GOOD: Eager loading
from sqlalchemy.orm import joinedload
books = Book.query.options(joinedload(Book.author)).all()
# Or set it at the model level:
class Book(db.Model):
author_id = db.Column(db.Integer, db.ForeignKey('authors.id'))
author = db.relationship('Author', lazy='joined')
Trusting that request.get_json() will always return valid data leads to KeyError and TypeError exceptions that become 500 errors in production. Always validate, either manually or with marshmallow.
# BAD: Leaks internal details
@app.errorhandler(Exception)
def handle_error(error):
return jsonify({'error': str(error)}), 500
# Client sees: "UNIQUE constraint failed: users.email"
# GOOD: Log internally, return generic message
@app.errorhandler(Exception)
def handle_error(error):
app.logger.error(f'Internal error: {error}', exc_info=True)
return jsonify({'error': 'An unexpected error occurred'}), 500
Version your API from day one. When you need to make breaking changes, clients on the old version continue to work:
# URL-based versioning (most common)
app.register_blueprint(books_v1_bp, url_prefix='/api/v1')
app.register_blueprint(books_v2_bp, url_prefix='/api/v2')
# Header-based versioning
@app.before_request
def check_api_version():
version = request.headers.get('API-Version', '1')
request.api_version = int(version)
Every response from your API should follow the same shape. Clients should never have to guess whether the response has a data key or a results key or a books key:
// Success
{
"success": true,
"data": { ... },
"pagination": { ... }
}
// Error
{
"success": false,
"error": {
"message": "Validation failed",
"code": 422,
"details": { ... }
}
}
Validate everything. Check types, lengths, ranges, formats, and business rules. Return all validation errors at once — do not make the client fix them one at a time:
// BAD: One error at a time
{"error": "Title is required"}
// (client fixes, resubmits)
{"error": "Year must be a number"}
// GOOD: All errors at once
{
"error": "Validation failed",
"details": {
"title": ["Title is required"],
"year": ["Year must be a number"],
"isbn": ["ISBN must be exactly 13 characters"]
}
}
Protect your API from abuse with Flask-Limiter:
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
limiter = Limiter(
app=app,
key_func=get_remote_address,
default_limits=["200 per day", "50 per hour"]
)
@app.route('/api/books', methods=['GET'])
@limiter.limit("30 per minute")
def get_books():
pass
@app.route('/api/auth/login', methods=['POST'])
@limiter.limit("5 per minute") # Stricter for auth endpoints
def login():
pass
Log every request in production. Log the method, path, status code, response time, and user identity if authenticated:
import time
from flask import g, request
@app.before_request
def start_timer():
g.start_time = time.time()
@app.after_request
def log_request(response):
duration = time.time() - g.start_time
app.logger.info(
f'{request.method} {request.path} '
f'{response.status_code} {duration:.3f}s'
)
return response
models.py, schemas in schemas.py, routes in routes/, error handlers in errors.py. This is not over-engineering — it is how you stay sane past 10 endpoints./api/v1/ is the simplest approach and the one your clients will thank you for.joinedload or subqueryload for relationships. Monitor your SQL queries in development.Flask gives you the freedom to architect your API exactly the way you want. That freedom is its greatest strength and its greatest risk — without discipline, a Flask API can become a tangled mess. Follow these patterns consistently and your API will be maintainable, testable, and production-ready.
Forms are the primary interface between your users and your application. Every login page, registration flow, search bar, checkout form, and settings panel depends on form handling done right. Get it wrong and you open the door to data corruption, security vulnerabilities, and frustrated users.
Flask gives you two approaches to form handling:
request.form, and validate everything manually. Fine for a single input, tedious and error-prone for anything more.This tutorial covers both approaches in depth. We start with raw form handling so you understand what Flask-WTF abstracts away, then move into the full Flask-WTF workflow including field types, validators, file uploads, dynamic forms, AJAX submission, and several practical examples you can drop into real projects.
Before reaching for any library, you should understand how Flask handles forms natively. This gives you the foundation to debug issues when they arise and appreciate what Flask-WTF does for you.
Here is a minimal login form submitted via POST:
<!-- templates/login_raw.html -->
<form method="POST" action="/login">
<div>
<label for="username">Username</label>
<input type="text" id="username" name="username" required>
</div>
<div>
<label for="password">Password</label>
<input type="password" id="password" name="password" required>
</div>
<button type="submit">Log In</button>
</form>
The route must accept both GET (display the form) and POST (process the submission):
from flask import Flask, request, render_template, redirect, url_for, flash
app = Flask(__name__)
app.secret_key = 'your-secret-key-here'
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form.get('username', '').strip()
password = request.form.get('password', '')
# Manual validation
errors = []
if not username:
errors.append('Username is required.')
if not password:
errors.append('Password is required.')
if len(password) < 8:
errors.append('Password must be at least 8 characters.')
if errors:
for error in errors:
flash(error, 'danger')
return render_template('login_raw.html', username=username)
# Authentication logic here
if authenticate(username, password):
flash('Welcome back!', 'success')
return redirect(url_for('dashboard'))
else:
flash('Invalid credentials.', 'danger')
return render_template('login_raw.html', username=username)
return render_template('login_raw.html')
Flash messages provide one-time feedback to the user. Display them in your template:
<!-- templates/base.html (partial) -->
{% with messages = get_flashed_messages(with_categories=true) %}
{% if messages %}
{% for category, message in messages %}
<div class="alert alert-{{ category }}">
{{ message }}
</div>
{% endfor %}
{% endif %}
{% endwith %}
This approach works, but it has significant drawbacks:
strip() or escape() and you have a bug or vulnerability.Flask-WTF solves all of these problems.
pip install Flask-WTF
This installs both Flask-WTF and its dependency WTForms.
Flask-WTF requires a SECRET_KEY for CSRF token generation:
import os
from flask import Flask
app = Flask(__name__)
# REQUIRED: Used to sign CSRF tokens and session cookies
app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY', 'dev-key-change-in-production')
# Optional Flask-WTF settings
app.config['WTF_CSRF_ENABLED'] = True # Default: True
app.config['WTF_CSRF_TIME_LIMIT'] = 3600 # Token expiry in seconds (default: 3600)
app.config['WTF_CSRF_SSL_STRICT'] = True # Strict referer checking for HTTPS
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16 MB upload limit
Cross-Site Request Forgery (CSRF) tricks a user's browser into submitting a form to your site using their authenticated session. Flask-WTF prevents this by:
In your templates, always include the CSRF token:
<form method="POST">
{{ form.hidden_tag() }}
<!-- or individually: {{ form.csrf_token }} -->
<!-- rest of the form -->
</form>
For AJAX requests, you can include the token in a meta tag and send it as a header:
<meta name="csrf-token" content="{{ csrf_token() }}">
# JavaScript
fetch('/api/submit', {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-CSRFToken': document.querySelector('meta[name="csrf-token"]').content
},
body: JSON.stringify(data)
});
WTForms provides a rich set of field types. Here is a comprehensive reference with practical usage for each.
from flask_wtf import FlaskForm
from wtforms import StringField, PasswordField, TextAreaField, HiddenField
from wtforms.validators import DataRequired, Length, Email
class ProfileForm(FlaskForm):
# Basic text input
username = StringField('Username', validators=[
DataRequired(),
Length(min=3, max=80)
])
# Renders as <input type="email">
email = StringField('Email', validators=[
DataRequired(),
Email(message='Enter a valid email address.')
])
# Renders as <input type="password">
password = PasswordField('Password', validators=[
DataRequired(),
Length(min=8, message='Password must be at least 8 characters.')
])
# Renders as <textarea>
bio = TextAreaField('Bio', validators=[
Length(max=500)
])
# Hidden field — not shown to users
user_id = HiddenField('User ID')
from wtforms import IntegerField, FloatField, DecimalField
from wtforms.validators import NumberRange
class ProductForm(FlaskForm):
quantity = IntegerField('Quantity', validators=[
DataRequired(),
NumberRange(min=1, max=1000, message='Quantity must be between 1 and 1000.')
])
price = FloatField('Price', validators=[
DataRequired(),
NumberRange(min=0.01)
])
# DecimalField for precise financial calculations
tax_rate = DecimalField('Tax Rate (%)', places=2, validators=[
NumberRange(min=0, max=100)
])
from wtforms import SelectField, RadioField, SelectMultipleField
class OrderForm(FlaskForm):
# Dropdown select
country = SelectField('Country', choices=[
('', '-- Select Country --'),
('us', 'United States'),
('uk', 'United Kingdom'),
('ca', 'Canada'),
('au', 'Australia')
], validators=[DataRequired()])
# Radio buttons
shipping = RadioField('Shipping Method', choices=[
('standard', 'Standard (5-7 days)'),
('express', 'Express (2-3 days)'),
('overnight', 'Overnight')
], default='standard')
# Multi-select (hold Ctrl/Cmd to select multiple)
interests = SelectMultipleField('Interests', choices=[
('tech', 'Technology'),
('science', 'Science'),
('art', 'Art')
])
from wtforms import BooleanField, DateField, DateTimeLocalField
class EventForm(FlaskForm):
# Checkbox
agree_terms = BooleanField('I agree to the terms and conditions', validators=[
DataRequired(message='You must accept the terms.')
])
# Date picker
event_date = DateField('Event Date', format='%Y-%m-%d', validators=[
DataRequired()
])
# DateTime picker
starts_at = DateTimeLocalField('Starts At', format='%Y-%m-%dT%H:%M')
from flask_wtf.file import FileField, FileAllowed, FileRequired, MultipleFileField
class UploadForm(FlaskForm):
# Single file upload
avatar = FileField('Profile Picture', validators=[
FileRequired(),
FileAllowed(['jpg', 'png', 'gif'], 'Images only!')
])
# Multiple file upload
documents = MultipleFileField('Documents', validators=[
FileAllowed(['pdf', 'doc', 'docx'], 'Documents only!')
])
from wtforms import SubmitField
class ContactForm(FlaskForm):
name = StringField('Name', validators=[DataRequired()])
message = TextAreaField('Message', validators=[DataRequired()])
submit = SubmitField('Send Message')
| Field | HTML Output | Use Case |
|---|---|---|
StringField |
<input type="text"> |
Names, usernames, short text |
PasswordField |
<input type="password"> |
Passwords, secrets |
TextAreaField |
<textarea> |
Long text, comments, bios |
IntegerField |
<input type="number"> |
Whole numbers, quantities |
FloatField |
<input type="number"> |
Decimal numbers, prices |
DecimalField |
<input type="text"> |
Precise decimals, currency |
BooleanField |
<input type="checkbox"> |
Agree/disagree, toggles |
SelectField |
<select> |
Dropdowns |
RadioField |
<input type="radio"> |
Single choice from few options |
SelectMultipleField |
<select multiple> |
Multiple selections |
DateField |
<input type="text"> |
Date input |
DateTimeLocalField |
<input type="datetime-local"> |
Date + time input |
FileField |
<input type="file"> |
Single file upload |
MultipleFileField |
<input type="file" multiple> |
Multiple file upload |
HiddenField |
<input type="hidden"> |
Passing data without UI |
SubmitField |
<input type="submit"> |
Form submission button |
Validators are the backbone of form security and data integrity. WTForms ships with a solid set of built-in validators, and you can write custom ones for business-specific rules.
from wtforms.validators import (
DataRequired, # Field must not be empty
InputRequired, # Field must have input (stricter than DataRequired)
Length, # String length between min and max
Email, # Valid email format
EqualTo, # Must match another field (password confirmation)
NumberRange, # Number between min and max
Optional, # Field is optional — skip other validators if empty
URL, # Valid URL format
UUID, # Valid UUID format
Regexp, # Must match a regex pattern
AnyOf, # Value must be in a list
NoneOf, # Value must NOT be in a list
MacAddress, # Valid MAC address
IPAddress, # Valid IP address
)
class RegistrationForm(FlaskForm):
username = StringField('Username', validators=[
DataRequired(),
Length(min=3, max=25),
Regexp(r'^[A-Za-z][A-Za-z0-9_.]*$',
message='Username must start with a letter and contain only letters, numbers, dots, and underscores.')
])
email = StringField('Email', validators=[
DataRequired(),
Email()
])
password = PasswordField('Password', validators=[
DataRequired(),
Length(min=8)
])
confirm_password = PasswordField('Confirm Password', validators=[
DataRequired(),
EqualTo('password', message='Passwords must match.')
])
age = IntegerField('Age', validators=[
Optional(),
NumberRange(min=13, max=120)
])
website = StringField('Website', validators=[
Optional(),
URL(message='Enter a valid URL.')
])
role = SelectField('Role', choices=['user', 'admin'], validators=[
AnyOf(['user', 'admin'])
])
For business rules specific to a single field, define a validate_<fieldname> method on the form class:
from wtforms import ValidationError
from models import User
class RegistrationForm(FlaskForm):
username = StringField('Username', validators=[DataRequired(), Length(min=3, max=25)])
email = StringField('Email', validators=[DataRequired(), Email()])
def validate_username(self, field):
"""Check that username is not already taken."""
user = User.query.filter_by(username=field.data).first()
if user:
raise ValidationError('That username is already taken.')
def validate_email(self, field):
"""Check that email is not already registered."""
user = User.query.filter_by(email=field.data).first()
if user:
raise ValidationError('An account with that email already exists.')
If you need the same validation logic across multiple forms, create a standalone validator function:
from wtforms import ValidationError
import re
def strong_password(form, field):
"""Enforce password complexity rules."""
password = field.data
if not re.search(r'[A-Z]', password):
raise ValidationError('Password must contain at least one uppercase letter.')
if not re.search(r'[a-z]', password):
raise ValidationError('Password must contain at least one lowercase letter.')
if not re.search(r'[0-9]', password):
raise ValidationError('Password must contain at least one digit.')
if not re.search(r'[!@#$%^&*(),.?":{}|<>]', password):
raise ValidationError('Password must contain at least one special character.')
def unique_field(model, column, message='This value is already in use.'):
"""Factory that returns a validator checking database uniqueness."""
def _validate(form, field):
existing = model.query.filter(column == field.data).first()
if existing:
raise ValidationError(message)
return _validate
# Usage
class SignupForm(FlaskForm):
username = StringField('Username', validators=[
DataRequired(),
unique_field(User, User.username, 'Username taken.')
])
password = PasswordField('Password', validators=[
DataRequired(),
Length(min=8),
strong_password
])
Override the validate method for cross-field validation:
class DateRangeForm(FlaskForm):
start_date = DateField('Start Date', validators=[DataRequired()])
end_date = DateField('End Date', validators=[DataRequired()])
def validate(self, extra_validators=None):
"""Ensure end date is after start date."""
if not super().validate(extra_validators=extra_validators):
return False
if self.end_date.data <= self.start_date.data:
self.end_date.errors.append('End date must be after start date.')
return False
return True
Each field on a form object is callable and renders the corresponding HTML element:
<!-- templates/register.html -->
{% extends "base.html" %}
{% block content %}
<h1>Register</h1>
<form method="POST" novalidate>
{{ form.hidden_tag() }}
<div class="form-group">
{{ form.username.label(class="form-label") }}
{{ form.username(class="form-control", placeholder="Choose a username") }}
{% if form.username.errors %}
{% for error in form.username.errors %}
<small class="text-danger">{{ error }}</small>
{% endfor %}
{% endif %}
</div>
<div class="form-group">
{{ form.email.label(class="form-label") }}
{{ form.email(class="form-control", placeholder="you@example.com") }}
{% if form.email.errors %}
{% for error in form.email.errors %}
<small class="text-danger">{{ error }}</small>
{% endfor %}
{% endif %}
</div>
<div class="form-group">
{{ form.password.label(class="form-label") }}
{{ form.password(class="form-control") }}
{% if form.password.errors %}
{% for error in form.password.errors %}
<small class="text-danger">{{ error }}</small>
{% endfor %}
{% endif %}
</div>
{{ form.submit(class="btn btn-primary") }}
</form>
{% endblock %}
Conditionally apply Bootstrap's is-invalid class to highlight fields with errors:
<div class="form-group">
{{ form.email.label(class="form-label") }}
{{ form.email(class="form-control" + (" is-invalid" if form.email.errors else "")) }}
{% for error in form.email.errors %}
<div class="invalid-feedback">{{ error }}</div>
{% endfor %}
</div>
Instead of repeating the same field rendering pattern for every field, define a macro:
<!-- templates/macros/forms.html -->
{% macro render_field(field, placeholder='') %}
<div class="mb-3">
{{ field.label(class="form-label") }}
{% if field.type == 'BooleanField' %}
<div class="form-check">
{{ field(class="form-check-input" + (" is-invalid" if field.errors else "")) }}
{{ field.label(class="form-check-label") }}
</div>
{% elif field.type == 'TextAreaField' %}
{{ field(class="form-control" + (" is-invalid" if field.errors else ""), rows=5, placeholder=placeholder) }}
{% elif field.type == 'SelectField' %}
{{ field(class="form-select" + (" is-invalid" if field.errors else "")) }}
{% elif field.type == 'FileField' %}
{{ field(class="form-control" + (" is-invalid" if field.errors else "")) }}
{% else %}
{{ field(class="form-control" + (" is-invalid" if field.errors else ""), placeholder=placeholder) }}
{% endif %}
{% for error in field.errors %}
<div class="invalid-feedback d-block">{{ error }}</div>
{% endfor %}
</div>
{% endmacro %}
Now rendering any form becomes trivial:
<!-- templates/register.html -->
{% from "macros/forms.html" import render_field %}
{% extends "base.html" %}
{% block content %}
<h1>Register</h1>
<form method="POST" novalidate>
{{ form.hidden_tag() }}
{{ render_field(form.username, placeholder='Choose a username') }}
{{ render_field(form.email, placeholder='you@example.com') }}
{{ render_field(form.password) }}
{{ render_field(form.confirm_password) }}
{{ render_field(form.agree_terms) }}
{{ form.submit(class="btn btn-primary") }}
</form>
{% endblock %}
File uploads require special handling: the form must use multipart/form-data encoding, files must be validated for type and size, and filenames must be sanitized before saving to disk.
from flask_wtf import FlaskForm
from flask_wtf.file import FileField, FileAllowed, FileRequired
from wtforms import StringField, SubmitField
from wtforms.validators import DataRequired
class AvatarUploadForm(FlaskForm):
username = StringField('Username', validators=[DataRequired()])
avatar = FileField('Profile Picture', validators=[
FileRequired(message='Please select a file.'),
FileAllowed(['jpg', 'jpeg', 'png', 'gif'], 'Images only (jpg, png, gif).')
])
submit = SubmitField('Upload')
import os
from flask import Flask, render_template, redirect, url_for, flash, current_app
from werkzeug.utils import secure_filename
import uuid
app = Flask(__name__)
app.config['SECRET_KEY'] = 'your-secret-key'
app.config['UPLOAD_FOLDER'] = os.path.join(app.root_path, 'static', 'uploads')
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16 MB max
# Ensure upload directory exists
os.makedirs(app.config['UPLOAD_FOLDER'], exist_ok=True)
def save_file(file_storage, subfolder=''):
"""Save an uploaded file with a unique name. Returns the saved filename."""
original = secure_filename(file_storage.filename)
# Prevent empty filename after sanitization
if not original:
original = 'upload'
ext = original.rsplit('.', 1)[-1].lower() if '.' in original else 'bin'
unique_name = f"{uuid.uuid4().hex}.{ext}"
save_path = os.path.join(current_app.config['UPLOAD_FOLDER'], subfolder)
os.makedirs(save_path, exist_ok=True)
file_storage.save(os.path.join(save_path, unique_name))
return unique_name
@app.route('/upload', methods=['GET', 'POST'])
def upload_avatar():
form = AvatarUploadForm()
if form.validate_on_submit():
filename = save_file(form.avatar.data, subfolder='avatars')
flash(f'Avatar uploaded: {filename}', 'success')
return redirect(url_for('upload_avatar'))
return render_template('upload.html', form=form)
<!-- templates/upload.html -->
{% extends "base.html" %}
{% block content %}
<h1>Upload Avatar</h1>
<form method="POST" enctype="multipart/form-data" novalidate>
{{ form.hidden_tag() }}
<div class="mb-3">
{{ form.username.label(class="form-label") }}
{{ form.username(class="form-control") }}
</div>
<div class="mb-3">
{{ form.avatar.label(class="form-label") }}
{{ form.avatar(class="form-control") }}
{% for error in form.avatar.errors %}
<div class="text-danger">{{ error }}</div>
{% endfor %}
</div>
<!-- Image preview -->
<div class="mb-3">
<img id="preview" src="#" alt="Preview" style="display:none; max-width:200px;" class="img-thumbnail">
</div>
{{ form.submit(class="btn btn-primary") }}
</form>
<script>
document.getElementById('avatar').addEventListener('change', function(e) {
const preview = document.getElementById('preview');
const file = e.target.files[0];
if (file) {
const reader = new FileReader();
reader.onload = function(e) {
preview.src = e.target.result;
preview.style.display = 'block';
};
reader.readAsDataURL(file);
}
});
</script>
{% endblock %}
from flask_wtf.file import MultipleFileField, FileAllowed
class DocumentUploadForm(FlaskForm):
documents = MultipleFileField('Upload Documents', validators=[
FileAllowed(['pdf', 'doc', 'docx', 'txt'], 'Documents only.')
])
submit = SubmitField('Upload All')
@app.route('/upload-docs', methods=['GET', 'POST'])
def upload_documents():
form = DocumentUploadForm()
if form.validate_on_submit():
saved = []
for file in form.documents.data:
if file.filename: # Skip empty file inputs
filename = save_file(file, subfolder='documents')
saved.append(filename)
flash(f'Uploaded {len(saved)} document(s).', 'success')
return redirect(url_for('upload_documents'))
return render_template('upload_docs.html', form=form)
Static form classes work for most cases. But sometimes you need forms that adapt at runtime — choices loaded from a database, fields added or removed based on user action, or nested sub-forms.
class AssignTaskForm(FlaskForm):
title = StringField('Task Title', validators=[DataRequired()])
assignee = SelectField('Assign To', coerce=int, validators=[DataRequired()])
submit = SubmitField('Create Task')
@app.route('/tasks/new', methods=['GET', 'POST'])
def new_task():
form = AssignTaskForm()
# Populate choices from database — MUST happen before validate_on_submit()
users = User.query.order_by(User.name).all()
form.assignee.choices = [(u.id, u.name) for u in users]
if form.validate_on_submit():
task = Task(title=form.title.data, assignee_id=form.assignee.data)
db.session.add(task)
db.session.commit()
flash('Task created.', 'success')
return redirect(url_for('task_list'))
return render_template('new_task.html', form=form)
Key point: Set form.assignee.choices before calling validate_on_submit(). Otherwise WTForms cannot validate the submitted value against the list of valid choices and the form will always fail validation.
FormField lets you embed one form inside another. Useful for address blocks, configuration sections, or any repeated structure:
from wtforms import FormField, Form
# Note: sub-form inherits from wtforms.Form, NOT FlaskForm
# (FlaskForm adds its own CSRF token — you only want one per outer form)
class AddressForm(Form):
street = StringField('Street', validators=[DataRequired()])
city = StringField('City', validators=[DataRequired()])
state = StringField('State', validators=[DataRequired(), Length(min=2, max=2)])
zip_code = StringField('ZIP Code', validators=[
DataRequired(),
Regexp(r'^\d{5}(-\d{4})?$', message='Invalid ZIP code.')
])
class CheckoutForm(FlaskForm):
full_name = StringField('Full Name', validators=[DataRequired()])
billing_address = FormField(AddressForm, label='Billing Address')
shipping_address = FormField(AddressForm, label='Shipping Address')
same_as_billing = BooleanField('Shipping same as billing')
submit = SubmitField('Place Order')
In the template, access nested fields with dot notation:
<fieldset>
<legend>Billing Address</legend>
{{ render_field(form.billing_address.street) }}
{{ render_field(form.billing_address.city) }}
{{ render_field(form.billing_address.state) }}
{{ render_field(form.billing_address.zip_code) }}
</fieldset>
Use FieldList when the number of fields is not known at class definition time:
from wtforms import FieldList, FormField, Form
class IngredientForm(Form):
name = StringField('Ingredient', validators=[DataRequired()])
quantity = StringField('Quantity', validators=[DataRequired()])
class RecipeForm(FlaskForm):
title = StringField('Recipe Title', validators=[DataRequired()])
ingredients = FieldList(FormField(IngredientForm), min_entries=1)
submit = SubmitField('Save Recipe')
@app.route('/recipes/new', methods=['GET', 'POST'])
def new_recipe():
form = RecipeForm()
if form.validate_on_submit():
recipe = Recipe(title=form.title.data)
for entry in form.ingredients.entries:
ingredient = Ingredient(
name=entry.data['name'],
quantity=entry.data['quantity']
)
recipe.ingredients.append(ingredient)
db.session.add(recipe)
db.session.commit()
flash('Recipe saved!', 'success')
return redirect(url_for('recipe_list'))
return render_template('new_recipe.html', form=form)
Add a button in the template to dynamically add more ingredient rows with JavaScript:
<div id="ingredients">
{% for entry in form.ingredients %}
<div class="ingredient-row row mb-2">
<div class="col">{{ entry.name(class="form-control", placeholder="Ingredient") }}</div>
<div class="col">{{ entry.quantity(class="form-control", placeholder="Quantity") }}</div>
<div class="col-auto">
<button type="button" class="btn btn-danger remove-row">Remove</button>
</div>
</div>
{% endfor %}
</div>
<button type="button" id="add-ingredient" class="btn btn-secondary mb-3">+ Add Ingredient</button>
<script>
let ingredientIndex = {{ form.ingredients | length }};
document.getElementById('add-ingredient').addEventListener('click', function() {
const container = document.getElementById('ingredients');
const row = document.createElement('div');
row.className = 'ingredient-row row mb-2';
row.innerHTML = `
<div class="col">
<input class="form-control" name="ingredients-${ingredientIndex}-name" placeholder="Ingredient">
</div>
<div class="col">
<input class="form-control" name="ingredients-${ingredientIndex}-quantity" placeholder="Quantity">
</div>
<div class="col-auto">
<button type="button" class="btn btn-danger remove-row">Remove</button>
</div>
`;
container.appendChild(row);
ingredientIndex++;
});
document.getElementById('ingredients').addEventListener('click', function(e) {
if (e.target.classList.contains('remove-row')) {
e.target.closest('.ingredient-row').remove();
}
});
</script>
Sometimes you want to submit a form without a full page reload — think inline editing, live search, or chat interfaces. Flask-WTF works fine with AJAX; you just need to handle the CSRF token and return JSON instead of HTML.
from flask import jsonify, request
class QuickNoteForm(FlaskForm):
content = TextAreaField('Note', validators=[
DataRequired(),
Length(max=500)
])
@app.route('/api/notes', methods=['POST'])
def create_note():
form = QuickNoteForm()
if form.validate_on_submit():
note = Note(content=form.content.data)
db.session.add(note)
db.session.commit()
return jsonify({
'success': True,
'note': {
'id': note.id,
'content': note.content,
'created_at': note.created_at.isoformat()
}
}), 201
# Return validation errors as JSON
return jsonify({
'success': False,
'errors': form.errors
}), 400
<meta name="csrf-token" content="{{ csrf_token() }}">
<form id="note-form">
<textarea id="note-content" class="form-control" placeholder="Write a note..."></textarea>
<div id="note-errors" class="text-danger mt-1"></div>
<button type="submit" class="btn btn-primary mt-2">Save Note</button>
</form>
<div id="notes-list"></div>
<script>
const csrfToken = document.querySelector('meta[name="csrf-token"]').content;
document.getElementById('note-form').addEventListener('submit', async function(e) {
e.preventDefault();
const content = document.getElementById('note-content').value;
const errorsDiv = document.getElementById('note-errors');
errorsDiv.textContent = '';
try {
const response = await fetch('/api/notes', {
method: 'POST',
headers: {
'Content-Type': 'application/x-www-form-urlencoded',
'X-CSRFToken': csrfToken
},
body: new URLSearchParams({ content: content })
});
const data = await response.json();
if (data.success) {
// Append the new note to the list
const notesList = document.getElementById('notes-list');
const noteEl = document.createElement('div');
noteEl.className = 'alert alert-info';
noteEl.textContent = data.note.content;
notesList.prepend(noteEl);
// Clear the form
document.getElementById('note-content').value = '';
} else {
// Display validation errors
const messages = Object.values(data.errors).flat();
errorsDiv.textContent = messages.join(' ');
}
} catch (err) {
errorsDiv.textContent = 'Network error. Please try again.';
}
});
</script>
If you have a pure REST API authenticated with tokens (not cookies), CSRF protection is not needed and can be selectively disabled:
from flask_wtf.csrf import CSRFProtect
csrf = CSRFProtect(app)
# Exempt a specific view
@csrf.exempt
@app.route('/webhook', methods=['POST'])
def webhook():
# This endpoint receives webhooks from external services
data = request.get_json()
# process webhook...
return jsonify({'status': 'ok'})
# Or exempt an entire blueprint
from flask import Blueprint
api_bp = Blueprint('api', __name__, url_prefix='/api/v1')
csrf.exempt(api_bp)
This example ties together field types, validators, custom validation, and template rendering into a production-ready registration flow.
# forms.py
from flask_wtf import FlaskForm
from wtforms import (StringField, PasswordField, BooleanField,
SelectField, SubmitField)
from wtforms.validators import (DataRequired, Length, Email,
EqualTo, Regexp, ValidationError)
from models import User
class RegistrationForm(FlaskForm):
first_name = StringField('First Name', validators=[
DataRequired(), Length(max=50)
])
last_name = StringField('Last Name', validators=[
DataRequired(), Length(max=50)
])
username = StringField('Username', validators=[
DataRequired(),
Length(min=3, max=25),
Regexp(r'^[a-zA-Z][a-zA-Z0-9_]*$',
message='Letters, numbers, and underscores only. Must start with a letter.')
])
email = StringField('Email', validators=[
DataRequired(), Email()
])
password = PasswordField('Password', validators=[
DataRequired(),
Length(min=8, message='At least 8 characters.')
])
confirm_password = PasswordField('Confirm Password', validators=[
DataRequired(),
EqualTo('password', message='Passwords must match.')
])
country = SelectField('Country', choices=[
('', '-- Select --'), ('us', 'United States'),
('uk', 'United Kingdom'), ('ca', 'Canada')
], validators=[DataRequired()])
agree_terms = BooleanField('I agree to the Terms of Service', validators=[
DataRequired(message='You must accept the terms.')
])
submit = SubmitField('Create Account')
def validate_username(self, field):
if User.query.filter_by(username=field.data).first():
raise ValidationError('Username already taken.')
def validate_email(self, field):
if User.query.filter_by(email=field.data).first():
raise ValidationError('Email already registered.')
# routes.py
from flask import render_template, redirect, url_for, flash
from werkzeug.security import generate_password_hash
from forms import RegistrationForm
from models import db, User
@app.route('/register', methods=['GET', 'POST'])
def register():
form = RegistrationForm()
if form.validate_on_submit():
user = User(
first_name=form.first_name.data,
last_name=form.last_name.data,
username=form.username.data,
email=form.email.data.lower(),
password_hash=generate_password_hash(form.password.data),
country=form.country.data
)
db.session.add(user)
db.session.commit()
flash('Account created! Please log in.', 'success')
return redirect(url_for('login'))
return render_template('register.html', form=form)
<!-- templates/register.html -->
{% from "macros/forms.html" import render_field %}
{% extends "base.html" %}
{% block content %}
<div class="row justify-content-center">
<div class="col-md-6">
<h1 class="mb-4">Create Account</h1>
<form method="POST" novalidate>
{{ form.hidden_tag() }}
<div class="row">
<div class="col-md-6">{{ render_field(form.first_name) }}</div>
<div class="col-md-6">{{ render_field(form.last_name) }}</div>
</div>
{{ render_field(form.username) }}
{{ render_field(form.email) }}
{{ render_field(form.password) }}
{{ render_field(form.confirm_password) }}
{{ render_field(form.country) }}
{{ render_field(form.agree_terms) }}
{{ form.submit(class="btn btn-primary btn-lg w-100") }}
</form>
<p class="mt-3 text-center">
Already have an account? <a href="{{ url_for('login') }}">Log in</a>
</p>
</div>
</div>
{% endblock %}
# forms.py
class ContactForm(FlaskForm):
name = StringField('Your Name', validators=[DataRequired(), Length(max=100)])
email = StringField('Your Email', validators=[DataRequired(), Email()])
subject = SelectField('Subject', choices=[
('general', 'General Inquiry'),
('support', 'Technical Support'),
('billing', 'Billing Question'),
('feedback', 'Feedback')
])
message = TextAreaField('Message', validators=[
DataRequired(),
Length(min=10, max=2000, message='Message must be 10-2000 characters.')
])
submit = SubmitField('Send Message')
# routes.py
from flask_mail import Mail, Message as MailMessage
mail = Mail(app)
@app.route('/contact', methods=['GET', 'POST'])
def contact():
form = ContactForm()
if form.validate_on_submit():
# Send email notification
msg = MailMessage(
subject=f"[Contact Form] {form.subject.data}: from {form.name.data}",
sender=app.config['MAIL_DEFAULT_SENDER'],
recipients=[app.config['ADMIN_EMAIL']],
reply_to=form.email.data
)
msg.body = f"""Name: {form.name.data}
Email: {form.email.data}
Subject: {form.subject.data}
{form.message.data}"""
try:
mail.send(msg)
flash('Message sent! We will get back to you soon.', 'success')
except Exception as e:
app.logger.error(f'Email send failed: {e}')
flash('Failed to send message. Please try again later.', 'danger')
return redirect(url_for('contact'))
return render_template('contact.html', form=form)
# forms.py
from flask_wtf.file import FileField, FileAllowed
class ProfileForm(FlaskForm):
first_name = StringField('First Name', validators=[DataRequired(), Length(max=50)])
last_name = StringField('Last Name', validators=[DataRequired(), Length(max=50)])
bio = TextAreaField('Bio', validators=[Length(max=500)])
avatar = FileField('Profile Picture', validators=[
FileAllowed(['jpg', 'jpeg', 'png'], 'Images only.')
])
submit = SubmitField('Save Changes')
# routes.py
from flask_login import login_required, current_user
@app.route('/profile/edit', methods=['GET', 'POST'])
@login_required
def edit_profile():
form = ProfileForm(obj=current_user) # Pre-populate from user object
if form.validate_on_submit():
current_user.first_name = form.first_name.data
current_user.last_name = form.last_name.data
current_user.bio = form.bio.data
if form.avatar.data:
# Delete old avatar if it exists
if current_user.avatar_filename:
old_path = os.path.join(
app.config['UPLOAD_FOLDER'], 'avatars', current_user.avatar_filename
)
if os.path.exists(old_path):
os.remove(old_path)
current_user.avatar_filename = save_file(
form.avatar.data, subfolder='avatars'
)
db.session.commit()
flash('Profile updated.', 'success')
return redirect(url_for('edit_profile'))
return render_template('edit_profile.html', form=form)
<!-- templates/edit_profile.html -->
{% from "macros/forms.html" import render_field %}
{% extends "base.html" %}
{% block content %}
<h1>Edit Profile</h1>
<form method="POST" enctype="multipart/form-data" novalidate>
{{ form.hidden_tag() }}
<div class="row">
<div class="col-md-8">
{{ render_field(form.first_name) }}
{{ render_field(form.last_name) }}
{{ render_field(form.bio) }}
</div>
<div class="col-md-4 text-center">
{% if current_user.avatar_filename %}
<img src="{{ url_for('static', filename='uploads/avatars/' + current_user.avatar_filename) }}"
class="img-thumbnail mb-3" style="max-width: 200px;">
{% endif %}
{{ render_field(form.avatar) }}
</div>
</div>
{{ form.submit(class="btn btn-primary") }}
</form>
{% endblock %}
A wizard form splits a long form into multiple steps. We store intermediate data in the session and validate one step at a time.
# forms.py
class WizardStep1Form(FlaskForm):
"""Step 1: Personal Information"""
first_name = StringField('First Name', validators=[DataRequired()])
last_name = StringField('Last Name', validators=[DataRequired()])
email = StringField('Email', validators=[DataRequired(), Email()])
next_step = SubmitField('Next')
class WizardStep2Form(FlaskForm):
"""Step 2: Address"""
street = StringField('Street Address', validators=[DataRequired()])
city = StringField('City', validators=[DataRequired()])
state = StringField('State', validators=[DataRequired()])
zip_code = StringField('ZIP Code', validators=[DataRequired()])
prev_step = SubmitField('Back')
next_step = SubmitField('Next')
class WizardStep3Form(FlaskForm):
"""Step 3: Preferences"""
plan = SelectField('Plan', choices=[
('free', 'Free'), ('pro', 'Pro ($9/mo)'), ('enterprise', 'Enterprise ($29/mo)')
])
notifications = BooleanField('Receive email notifications', default=True)
prev_step = SubmitField('Back')
finish = SubmitField('Complete Registration')
# routes.py
from flask import session
WIZARD_FORMS = {
1: WizardStep1Form,
2: WizardStep2Form,
3: WizardStep3Form
}
@app.route('/wizard', methods=['GET'])
def wizard_start():
session.pop('wizard_data', None)
return redirect(url_for('wizard_step', step=1))
@app.route('/wizard/step/<int:step>', methods=['GET', 'POST'])
def wizard_step(step):
if step not in WIZARD_FORMS:
return redirect(url_for('wizard_start'))
FormClass = WIZARD_FORMS[step]
wizard_data = session.get('wizard_data', {})
# Pre-populate from session if returning to a step
step_data = wizard_data.get(str(step), {})
form = FormClass(data=step_data) if request.method == 'GET' and step_data else FormClass()
if request.method == 'POST':
# Handle "Back" button — no validation needed
if hasattr(form, 'prev_step') and form.prev_step.data:
return redirect(url_for('wizard_step', step=step - 1))
if form.validate_on_submit():
# Save step data to session
wizard_data[str(step)] = {
field.name: field.data
for field in form
if field.name not in ('csrf_token', 'next_step', 'prev_step', 'finish')
}
session['wizard_data'] = wizard_data
if step == 3: # Final step
# Combine all wizard data and create the user
all_data = {}
for step_dict in wizard_data.values():
all_data.update(step_dict)
# Create user with all_data...
user = create_user_from_wizard(all_data)
session.pop('wizard_data', None)
flash('Registration complete!', 'success')
return redirect(url_for('dashboard'))
return redirect(url_for('wizard_step', step=step + 1))
return render_template(f'wizard_step{step}.html', form=form, step=step)
<!-- templates/wizard_step1.html -->
{% from "macros/forms.html" import render_field %}
{% extends "base.html" %}
{% block content %}
<h1>Registration - Step {{ step }} of 3</h1>
<!-- Progress bar -->
<div class="progress mb-4">
<div class="progress-bar" style="width: {{ (step / 3 * 100) | int }}%">
Step {{ step }} of 3
</div>
</div>
<form method="POST" novalidate>
{{ form.hidden_tag() }}
{{ render_field(form.first_name) }}
{{ render_field(form.last_name) }}
{{ render_field(form.email) }}
{{ form.next_step(class="btn btn-primary") }}
</form>
{% endblock %}
These are the mistakes I see most often in code reviews. Avoid them.
<!-- WRONG: form will silently fail validation -->
<form method="POST">
{{ form.username() }}
<button type="submit">Submit</button>
</form>
<!-- RIGHT: always include hidden_tag() -->
<form method="POST">
{{ form.hidden_tag() }}
{{ form.username() }}
<button type="submit">Submit</button>
</form>
Symptom: form.validate_on_submit() returns False but form.errors shows {'csrf_token': ['The CSRF token is missing.']}.
# WRONG: Trusting client-side validation
@app.route('/submit', methods=['POST'])
def submit():
# Directly using request.form without any validation
name = request.form['name'] # Can be empty, malicious, or missing entirely
db.execute(f"INSERT INTO users (name) VALUES ('{name}')")
# RIGHT: Always validate server-side
@app.route('/submit', methods=['POST'])
def submit():
form = MyForm()
if form.validate_on_submit():
# Data has been validated and is safe to use
user = User(name=form.name.data)
db.session.add(user)
db.session.commit()
HTML5 required and pattern attributes are convenience features, not security measures. A user can bypass them with browser devtools or by sending a raw HTTP request.
# WRONG: Using the original filename directly
file.save(os.path.join(upload_dir, file.filename))
# Attacker uploads: "../../../etc/passwd" or "shell.php"
# RIGHT: Always sanitize and rename
from werkzeug.utils import secure_filename
import uuid
original = secure_filename(file.filename) # Strips path components
ext = original.rsplit('.', 1)[-1].lower()
new_name = f"{uuid.uuid4().hex}.{ext}"
file.save(os.path.join(upload_dir, new_name))
# WRONG: validate_on_submit() runs before choices are set
@app.route('/assign', methods=['GET', 'POST'])
def assign():
form = AssignForm()
if form.validate_on_submit(): # Always fails — choices is empty!
pass
form.user.choices = [(u.id, u.name) for u in User.query.all()]
return render_template('assign.html', form=form)
# RIGHT: Set choices BEFORE validation
@app.route('/assign', methods=['GET', 'POST'])
def assign():
form = AssignForm()
form.user.choices = [(u.id, u.name) for u in User.query.all()]
if form.validate_on_submit():
pass
return render_template('assign.html', form=form)
# WRONG: FlaskForm adds its own CSRF token — nested forms will have duplicate tokens
class AddressForm(FlaskForm): # <-- Wrong base class
street = StringField('Street')
# RIGHT: Use wtforms.Form for sub-forms
from wtforms import Form
class AddressForm(Form): # <-- Correct
street = StringField('Street')
<!-- WRONG: files will not be sent -->
<form method="POST">
{{ form.hidden_tag() }}
{{ form.avatar() }}
<button type="submit">Upload</button>
</form>
<!-- RIGHT: enctype is required for file uploads -->
<form method="POST" enctype="multipart/form-data">
{{ form.hidden_tag() }}
{{ form.avatar() }}
<button type="submit">Upload</button>
</form>
Client-side validation (HTML5 attributes, JavaScript) improves UX. Server-side validation is the actual security boundary. Never skip it.
Even for a simple search bar, the discipline of defining a form class pays off. You get CSRF protection, consistent validation, and clear separation between form definition and route logic.
myapp/
__init__.py
forms/
__init__.py
auth.py # LoginForm, RegistrationForm
profile.py # ProfileForm, AvatarForm
admin.py # AdminSettingsForm
routes/
auth.py
profile.py
templates/
models.py
Define form rendering macros once and import them everywhere. This ensures consistent styling and error display across your entire application.
Always configure a maximum upload size. Without it, a user can send a multi-gigabyte request and crash your server:
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16 MB
Initialize CSRFProtect globally and only exempt endpoints that genuinely need it (webhooks, token-authenticated APIs):
from flask_wtf.csrf import CSRFProtect
csrf = CSRFProtect()
def create_app():
app = Flask(__name__)
csrf.init_app(app)
return app
Use the obj parameter to pre-fill a form from an existing model instance:
form = ProfileForm(obj=current_user) # GET: pre-fills from user
if form.validate_on_submit():
form.populate_obj(current_user) # POST: writes validated data back
db.session.commit()
Always use categories (success, danger, warning, info) so your template can style them appropriately with Bootstrap alert classes.
{{ form.hidden_tag() }} in your templates. For AJAX, send the token via the X-CSRFToken header.DataRequired then Length then Email. Write custom validators for business rules using validate_<fieldname> methods or standalone functions.enctype="multipart/form-data" on the form, FileAllowed/FileRequired validators, and secure_filename + UUID renaming before saving.SelectField choices from the database before calling validate_on_submit(), use FieldList for repeating fields, and FormField for nested structures.MAX_CONTENT_LENGTH to prevent denial-of-service via oversized uploads.Flask is a lightweight WSGI web application framework for Python. Created by Armin Ronacher in 2010, it started as an April Fools’ joke that merged two libraries — Werkzeug (a WSGI toolkit) and Jinja2 (a template engine) — into a single package. The joke turned out to be genuinely useful, and Flask quickly became one of the most popular Python web frameworks in the world.
Flask calls itself a “micro-framework,” but do not let that label fool you. “Micro” means the core is deliberately small and unopinionated. Flask gives you routing, request handling, templates, and a development server. Everything else — database access, form validation, authentication, file uploads — you choose and plug in yourself. This is a feature, not a limitation. It means you understand every piece of your stack, and you never carry dead weight from features you do not use.
The two pillars that Flask is built on deserve a brief mention:
Flask’s philosophy is simple: give developers the tools they need and get out of their way. There is no ORM you are forced to use, no admin panel you never asked for, no project structure you must follow. You make the decisions. This tutorial will take you from zero to a working Flask application, covering everything a professional developer needs to know to build real applications with Flask.
The “Flask or Django?” question comes up in every Python web development discussion. The answer depends entirely on what you are building and how you prefer to work.
Neither framework is objectively better. Flask gives you freedom; Django gives you structure. Senior developers often use both — Flask for APIs and services, Django for full-stack web applications. This tutorial focuses on Flask because understanding it teaches you how web frameworks actually work, which makes you better with any framework.
Before installing Flask, set up a virtual environment. If you are not familiar with virtual environments and pip, review our Python Advanced – Virtual Environments & pip tutorial first. Never install Flask globally — always isolate your project dependencies.
# Create a project directory mkdir flask-tutorial && cd flask-tutorial # Create a virtual environment python3 -m venv venv # Activate it # macOS / Linux source venv/bin/activate # Windows venv\Scripts\activate
# Install Flask pip install flask # Verify the installation python -c "import flask; print(flask.__version__)"
Flask installs its dependencies automatically: Werkzeug, Jinja2, MarkupSafe, ItsDangerous, Click, and Blinker. You do not need to install these separately.
# Save your dependencies pip freeze > requirements.txt
Your requirements.txt should look something like this:
blinker==1.9.0 click==8.1.7 Flask==3.1.0 itsdangerous==2.2.0 Jinja2==3.1.4 MarkupSafe==3.0.2 Werkzeug==3.1.3
Let us build the classic “Hello World” application. Create a file called app.py.
from flask import Flask
app = Flask(__name__)
@app.route("/")
def hello():
return "Hello, World!"
if __name__ == "__main__":
app.run(debug=True)
Let us break down every line:
from flask import Flask — Import the Flask class. An instance of this class is your WSGI application.app = Flask(__name__) — Create an application instance. The __name__ argument tells Flask where to look for templates, static files, and other resources. When you run the file directly, __name__ is "__main__". When imported as a module, it is the module name.@app.route("/") — A decorator that tells Flask which URL should trigger this function. The "/" means the root URL of your application.def hello() — The view function. Flask calls this when someone visits the root URL. Whatever it returns becomes the HTTP response body.app.run(debug=True) — Starts the built-in development server with debug mode enabled.You have two options for running your Flask app:
# Option 1: Run the file directly python app.py # Option 2: Use the flask command (recommended) flask --app app run --debug
Both methods start the development server on http://127.0.0.1:5000. Open that URL in your browser and you should see “Hello, World!”.
Debug mode provides two critical features during development:
Warning: Never enable debug mode in production. The interactive debugger allows arbitrary code execution on your server. We will cover this in more detail in the Common Pitfalls section.
Flask does not enforce a project structure, which is both its strength and a potential pitfall. Here are two structures that work well depending on your project size.
For small apps, APIs, or prototypes, a flat structure works fine:
flask-tutorial/ ├── app.py # Application code ├── templates/ # Jinja2 templates │ ├── base.html │ ├── index.html │ └── about.html ├── static/ # CSS, JS, images │ ├── css/ │ │ └── style.css │ └── js/ │ └── main.js ├── requirements.txt ├── .env ├── .gitignore └── venv/
When your application grows beyond a few hundred lines, organize it as a Python package using the application factory pattern:
flask-tutorial/ ├── app/ │ ├── __init__.py # Application factory (create_app) │ ├── routes/ │ │ ├── __init__.py │ │ ├── main.py # Main blueprint │ │ ├── auth.py # Auth blueprint │ │ └── api.py # API blueprint │ ├── models/ │ │ ├── __init__.py │ │ └── user.py │ ├── templates/ │ │ ├── base.html │ │ ├── index.html │ │ └── auth/ │ │ ├── login.html │ │ └── register.html │ ├── static/ │ │ ├── css/ │ │ └── js/ │ └── config.py ├── tests/ │ ├── __init__.py │ ├── conftest.py │ └── test_routes.py ├── requirements.txt ├── .env ├── .gitignore └── run.py
The application factory is a function that creates and configures your Flask app. It is the recommended pattern for any non-trivial Flask application because it allows you to create multiple instances with different configurations (useful for testing), avoids circular imports, and keeps your application modular.
# app/__init__.py
from flask import Flask
def create_app(config_name="default"):
app = Flask(__name__)
# Load configuration
if config_name == "testing":
app.config.from_object("app.config.TestingConfig")
elif config_name == "production":
app.config.from_object("app.config.ProductionConfig")
else:
app.config.from_object("app.config.DevelopmentConfig")
# Register blueprints
from app.routes.main import main_bp
app.register_blueprint(main_bp)
from app.routes.auth import auth_bp
app.register_blueprint(auth_bp, url_prefix="/auth")
return app
# run.py
from app import create_app
app = create_app()
if __name__ == "__main__":
app.run(debug=True)
Flask applications need configuration for things like secret keys, database URLs, debug flags, and third-party API credentials. Flask provides several ways to manage configuration, and choosing the right approach matters for both security and maintainability.
The cleanest approach is to define configuration as Python classes. Each class represents an environment.
# config.py
import os
class Config:
"""Base configuration shared by all environments."""
SECRET_KEY = os.environ.get("SECRET_KEY", "fallback-secret-key")
SQLALCHEMY_TRACK_MODIFICATIONS = False
class DevelopmentConfig(Config):
"""Development-specific configuration."""
DEBUG = True
SQLALCHEMY_DATABASE_URI = os.environ.get(
"DATABASE_URL", "sqlite:///dev.db"
)
class TestingConfig(Config):
"""Testing-specific configuration."""
TESTING = True
SQLALCHEMY_DATABASE_URI = "sqlite:///test.db"
class ProductionConfig(Config):
"""Production-specific configuration."""
DEBUG = False
SQLALCHEMY_DATABASE_URI = os.environ.get("DATABASE_URL")
# In production, SECRET_KEY must be set via environment variable
SECRET_KEY = os.environ["SECRET_KEY"]
# Load from a class
app.config.from_object("config.DevelopmentConfig")
# Load from environment variable pointing to a config file
app.config.from_envvar("APP_CONFIG_FILE")
# Set individual values
app.config["MAX_CONTENT_LENGTH"] = 16 * 1024 * 1024 # 16 MB upload limit
For local development, storing environment variables in a .env file is standard practice. Install python-dotenv and Flask will automatically load .env files.
pip install python-dotenv
# .env (add this to .gitignore!) FLASK_APP=app FLASK_DEBUG=1 SECRET_KEY=my-super-secret-key-change-in-production DATABASE_URL=sqlite:///dev.db
from dotenv import load_dotenv
load_dotenv() # Load variables from .env file
import os
secret = os.environ.get("SECRET_KEY")
Never commit .env files to version control. They contain secrets. Add .env to your .gitignore and provide a .env.example file with placeholder values so teammates know which variables to set.
Routing is the mechanism that maps URLs to Python functions. In Flask, you define routes using the @app.route() decorator.
@app.route("/")
def index():
return "Home Page"
@app.route("/about")
def about():
return "About Page"
@app.route("/contact")
def contact():
return "Contact Page"
You can capture parts of the URL as variables and pass them to your view function. Flask supports several converter types.
# String (default) - accepts any text without slashes
@app.route("/user/<username>")
def user_profile(username):
return f"Profile: {username}"
# Integer
@app.route("/post/<int:post_id>")
def show_post(post_id):
return f"Post #{post_id}"
# Float
@app.route("/price/<float:amount>")
def show_price(amount):
return f"Price: ${amount:.2f}"
# Path - like string but accepts slashes
@app.route("/files/<path:filepath>")
def serve_file(filepath):
return f"Serving: {filepath}"
# UUID
@app.route("/item/<uuid:item_id>")
def get_item(item_id):
return f"Item: {item_id}"
By default, routes only respond to GET requests. Use the methods parameter to accept other HTTP methods.
from flask import request, redirect, url_for, render_template, jsonify
@app.route("/login", methods=["GET", "POST"])
def login():
if request.method == "POST":
username = request.form["username"]
password = request.form["password"]
# Authenticate user...
return redirect(url_for("index"))
return render_template("login.html")
@app.route("/api/users", methods=["GET"])
def list_users():
users = [{"id": 1, "name": "Folau"}, {"id": 2, "name": "Sione"}]
return jsonify(users)
@app.route("/api/users", methods=["POST"])
def create_user():
data = request.get_json()
# Create user...
return jsonify({"id": 3, "name": data["name"]}), 201
@app.route("/api/users/<int:user_id>", methods=["PUT"])
def update_user(user_id):
data = request.get_json()
# Update user...
return jsonify({"id": user_id, "name": data["name"]})
@app.route("/api/users/<int:user_id>", methods=["DELETE"])
def delete_user(user_id):
# Delete user...
return "", 204
url_for() generates URLs based on function names rather than hardcoded paths. This is critical because if you change a URL pattern, all url_for() calls automatically update.
from flask import url_for
@app.route("/")
def index():
return "Home"
@app.route("/user/<username>")
def profile(username):
return f"Profile: {username}"
# In your code or templates:
with app.test_request_context():
print(url_for("index")) # /
print(url_for("profile", username="folau")) # /user/folau
print(url_for("static", filename="css/style.css")) # /static/css/style.css
Always use url_for() instead of hardcoding URLs. It handles URL encoding, respects your application’s root path, and makes refactoring painless.
Flask provides a request object that gives you access to all incoming HTTP request data, and several utilities for building responses.
from flask import request
@app.route("/search")
def search():
# Query string parameters: /search?q=flask&page=2
query = request.args.get("q", "")
page = request.args.get("page", 1, type=int)
return f"Searching for '{query}' on page {page}"
@app.route("/submit", methods=["POST"])
def submit():
# Form data (application/x-www-form-urlencoded or multipart/form-data)
name = request.form.get("name")
email = request.form.get("email")
return f"Received: {name}, {email}"
@app.route("/api/data", methods=["POST"])
def receive_json():
# JSON data (application/json)
data = request.get_json()
return jsonify({"received": data})
@app.route("/upload", methods=["POST"])
def upload_file():
# File uploads
file = request.files.get("document")
if file:
file.save(f"./uploads/{file.filename}")
return f"Uploaded: {file.filename}"
return "No file provided", 400
@app.route("/debug-request")
def debug_request():
info = {
"method": request.method, # GET, POST, etc.
"url": request.url, # Full URL
"path": request.path, # /debug-request
"host": request.host, # localhost:5000
"headers": dict(request.headers), # All headers
"content_type": request.content_type, # Request content type
"remote_addr": request.remote_addr, # Client IP address
"cookies": dict(request.cookies), # All cookies
}
return jsonify(info)
from flask import make_response, jsonify, redirect, url_for
# Simple string response (200 OK by default)
@app.route("/simple")
def simple():
return "Hello"
# String with custom status code
@app.route("/not-found-example")
def custom_status():
return "This page does not exist", 404
# JSON response
@app.route("/api/status")
def api_status():
return jsonify({"status": "healthy", "version": "1.0.0"})
# Full control with make_response
@app.route("/custom")
def custom_response():
resp = make_response("Custom Response", 200)
resp.headers["X-Custom-Header"] = "MyValue"
resp.set_cookie("visited", "true", max_age=3600)
return resp
# Redirect
@app.route("/old-page")
def old_page():
return redirect(url_for("index"))
Returning HTML strings from view functions is impractical for real applications. Flask uses the Jinja2 template engine to separate your presentation logic from your application logic. Templates live in a templates/ directory by default.
from flask import render_template
@app.route("/")
def index():
return render_template("index.html", title="Home", user="Folau")
@app.route("/posts")
def posts():
all_posts = [
{"id": 1, "title": "Flask Basics", "author": "Folau"},
{"id": 2, "title": "Jinja2 Templates", "author": "Folau"},
{"id": 3, "title": "Flask Routing", "author": "Folau"},
]
return render_template("posts.html", posts=all_posts)
Template inheritance is one of Jinja2’s most powerful features. You define a base template with blocks that child templates can override. This eliminates HTML duplication across pages.
<!-- templates/base.html -->
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>{% block title %}My Flask App{% endblock %}</title>
<link rel="stylesheet" href="{{ url_for('static', filename='css/style.css') }}">
</head>
<body>
<nav>
<a href="{{ url_for('index') }}">Home</a>
<a href="{{ url_for('about') }}">About</a>
<a href="{{ url_for('posts') }}">Posts</a>
</nav>
<main>
{% block content %}{% endblock %}
</main>
<footer>
<p>© 2026 My Flask App</p>
</footer>
</body>
</html>
<!-- templates/index.html -->
{% extends "base.html" %}
{% block title %}Home - My Flask App{% endblock %}
{% block content %}
<h1>Welcome, {{ user }}!</h1>
<p>This is a Flask application with Jinja2 templates.</p>
{% endblock %}
<!-- Output a variable -->
<h1>{{ title }}</h1>
<!-- Access object attributes -->
<p>{{ user.name }}</p>
<!-- Access dictionary keys -->
<p>{{ config['SECRET_KEY'] }}</p>
<!-- Expressions -->
<p>{{ items | length }} items in your cart</p>
<!-- Conditionals -->
{% if user %}
<p>Hello, {{ user.name }}!</p>
{% elif guest %}
<p>Hello, guest!</p>
{% else %}
<p>Please log in.</p>
{% endif %}
<!-- Loops -->
<ul>
{% for post in posts %}
<li>
<a href="{{ url_for('show_post', post_id=post.id) }}">
{{ post.title }}
</a>
<span>by {{ post.author }}</span>
</li>
{% else %}
<li>No posts found.</li>
{% endfor %}
</ul>
<!-- Loop with index -->
{% for item in items %}
<p>{{ loop.index }}. {{ item }}</p>
{% endfor %}
Jinja2 filters transform values. They are applied using the pipe | operator.
<!-- Built-in filters -->
{{ name | capitalize }}
{{ name | upper }}
{{ name | lower }}
{{ description | truncate(100) }}
{{ items | length }}
{{ price | round(2) }}
{{ html_content | safe }} <!-- Render HTML without escaping -->
{{ list | join(", ") }}
{{ date | default("No date") }}
By default, Jinja2 auto-escapes HTML in variables to prevent XSS attacks. The | safe filter tells Jinja2 you trust the content and it should be rendered as raw HTML. Use this sparingly and only with content you control.
Static files — CSS, JavaScript, images — live in the static/ directory. Flask automatically serves this directory at the /static URL path.
static/
├── css/
│ ├── style.css
│ └── bootstrap.min.css
├── js/
│ ├── main.js
│ └── jquery.min.js
└── images/
├── logo.png
└── favicon.ico
Always use url_for('static', filename=...) to reference static files. Never hardcode paths.
<!-- CSS -->
<link rel="stylesheet" href="{{ url_for('static', filename='css/style.css') }}">
<!-- JavaScript -->
<script src="{{ url_for('static', filename='js/main.js') }}"></script>
<!-- Images -->
<img src="{{ url_for('static', filename='images/logo.png') }}" alt="Logo">
<!-- Favicon -->
<link rel="icon" href="{{ url_for('static', filename='images/favicon.ico') }}">
Using url_for() ensures correct paths regardless of where your application is mounted. If your app runs at /myapp/ instead of /, url_for() adjusts automatically. Hardcoded paths would break.
Flask lets you register custom error handlers so your users see friendly, branded error pages instead of generic browser defaults.
from flask import render_template
@app.errorhandler(404)
def page_not_found(error):
return render_template("errors/404.html"), 404
@app.errorhandler(500)
def internal_server_error(error):
return render_template("errors/500.html"), 500
@app.errorhandler(403)
def forbidden(error):
return render_template("errors/403.html"), 403
<!-- templates/errors/404.html -->
{% extends "base.html" %}
{% block title %}Page Not Found{% endblock %}
{% block content %}
<div class="error-page">
<h1>404</h1>
<h2>Page Not Found</h2>
<p>The page you are looking for does not exist.</p>
<a href="{{ url_for('index') }}">Go back home</a>
</div>
{% endblock %}
For JSON APIs, return JSON error responses instead of HTML.
from flask import jsonify
from werkzeug.exceptions import HTTPException
@app.errorhandler(HTTPException)
def handle_http_exception(error):
response = jsonify({
"error": error.name,
"message": error.description,
"status_code": error.code,
})
response.status_code = error.code
return response
@app.errorhandler(Exception)
def handle_generic_exception(error):
# Log the error for debugging
app.logger.error(f"Unhandled exception: {error}", exc_info=True)
response = jsonify({
"error": "Internal Server Error",
"message": "An unexpected error occurred.",
"status_code": 500,
})
response.status_code = 500
return response
Use abort() to immediately stop a request and return an error status code.
from flask import abort
@app.route("/admin")
def admin_panel():
if not current_user.is_admin:
abort(403) # Forbidden
return render_template("admin.html")
@app.route("/api/users/<int:user_id>")
def get_user(user_id):
user = User.query.get(user_id)
if user is None:
abort(404) # Not Found
return jsonify(user.to_dict())
Let us pull everything together and build a simple blog application with multiple routes, template inheritance, and static CSS. This example demonstrates real-world Flask patterns you will use in production applications.
# app.py
from flask import Flask, render_template, abort
app = Flask(__name__)
# Simulated blog data (in a real app, this comes from a database)
POSTS = [
{
"id": 1,
"title": "Getting Started with Flask",
"slug": "getting-started-with-flask",
"author": "Folau Kaveinga",
"date": "2026-02-20",
"summary": "Learn the basics of Flask, the lightweight Python web framework.",
"content": (
"Flask is a micro-framework for Python that gives you the essentials "
"for building web applications without imposing unnecessary complexity. "
"In this post, we explore the core concepts that make Flask a favorite "
"among Python developers."
),
},
{
"id": 2,
"title": "Understanding Jinja2 Templates",
"slug": "understanding-jinja2-templates",
"author": "Folau Kaveinga",
"date": "2026-02-22",
"summary": "Deep dive into Jinja2 template engine features and best practices.",
"content": (
"Jinja2 is the template engine that powers Flask's HTML rendering. "
"It supports template inheritance, filters, macros, and more. "
"Mastering Jinja2 is essential for building maintainable Flask applications."
),
},
{
"id": 3,
"title": "Flask Routing and URL Building",
"slug": "flask-routing-and-url-building",
"author": "Folau Kaveinga",
"date": "2026-02-25",
"summary": "Master Flask routing, variable rules, and URL generation.",
"content": (
"Routing is the backbone of any web application. Flask makes it "
"intuitive with decorators and supports variable rules, multiple "
"HTTP methods, and dynamic URL building with url_for()."
),
},
]
@app.route("/")
def index():
return render_template("blog/index.html", posts=POSTS)
@app.route("/about")
def about():
return render_template("blog/about.html")
@app.route("/post/<int:post_id>")
def post_detail(post_id):
post = next((p for p in POSTS if p["id"] == post_id), None)
if post is None:
abort(404)
return render_template("blog/post.html", post=post)
@app.errorhandler(404)
def page_not_found(error):
return render_template("blog/404.html"), 404
if __name__ == "__main__":
app.run(debug=True)
<!-- templates/blog/base.html -->
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>{% block title %}Flask Blog{% endblock %}</title>
<link rel="stylesheet" href="{{ url_for('static', filename='css/blog.css') }}">
</head>
<body>
<header>
<nav>
<div class="nav-brand">Flask Blog</div>
<div class="nav-links">
<a href="{{ url_for('index') }}">Home</a>
<a href="{{ url_for('about') }}">About</a>
</div>
</nav>
</header>
<main>
{% block content %}{% endblock %}
</main>
<footer>
<p>Built with Flask — © 2026</p>
</footer>
</body>
</html>
<!-- templates/blog/index.html -->
{% extends "blog/base.html" %}
{% block title %}Home - Flask Blog{% endblock %}
{% block content %}
<h1>Latest Posts</h1>
{% for post in posts %}
<article class="post-card">
<h2>
<a href="{{ url_for('post_detail', post_id=post.id) }}">{{ post.title }}</a>
</h2>
<div class="post-meta">{{ post.author }} · {{ post.date }}</div>
<p>{{ post.summary }}</p>
<a href="{{ url_for('post_detail', post_id=post.id) }}" class="read-more">
Read more →
</a>
</article>
{% endfor %}
{% endblock %}
<!-- templates/blog/post.html -->
{% extends "blog/base.html" %}
{% block title %}{{ post.title }} - Flask Blog{% endblock %}
{% block content %}
<article class="post-full">
<h1>{{ post.title }}</h1>
<div class="post-meta">{{ post.author }} · {{ post.date }}</div>
<div class="post-content">
{{ post.content }}
</div>
<a href="{{ url_for('index') }}">← Back to all posts</a>
</article>
{% endblock %}
<!-- templates/blog/about.html -->
{% extends "blog/base.html" %}
{% block title %}About - Flask Blog{% endblock %}
{% block content %}
<h1>About This Blog</h1>
<p>This is a simple blog built with Flask to demonstrate core framework concepts
including routing, template inheritance, static files, and error handling.</p>
<p>Flask makes it straightforward to build web applications in Python without
the overhead of a full-stack framework. You choose the components you need
and assemble them yourself.</p>
{% endblock %}
/* static/css/blog.css */
* {
margin: 0;
padding: 0;
box-sizing: border-box;
}
body {
font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, sans-serif;
line-height: 1.6;
color: #333;
max-width: 800px;
margin: 0 auto;
padding: 0 20px;
}
header nav {
display: flex;
justify-content: space-between;
align-items: center;
padding: 20px 0;
border-bottom: 2px solid #eee;
}
.nav-brand {
font-size: 1.5rem;
font-weight: bold;
color: #2563eb;
}
.nav-links a {
margin-left: 20px;
text-decoration: none;
color: #555;
}
.nav-links a:hover { color: #2563eb; }
main { padding: 40px 0; }
.post-card {
margin-bottom: 30px;
padding-bottom: 30px;
border-bottom: 1px solid #eee;
}
.post-card h2 a {
text-decoration: none;
color: #1a1a1a;
}
.post-card h2 a:hover { color: #2563eb; }
.post-meta {
color: #888;
font-size: 0.9rem;
margin: 8px 0;
}
.read-more {
color: #2563eb;
text-decoration: none;
font-weight: 500;
}
.post-full h1 { margin-bottom: 10px; }
.post-content {
margin: 20px 0;
line-height: 1.8;
}
footer {
padding: 20px 0;
border-top: 2px solid #eee;
text-align: center;
color: #888;
}
Run the application with python app.py and visit http://127.0.0.1:5000. You now have a working blog with a home page listing posts, individual post pages, an about page, and custom 404 handling. All of it using template inheritance so the navigation and footer are defined once in base.html.
The Flask development server is single-threaded, unoptimized, and not designed to handle real traffic. Never use it in production. Instead, use a production-grade WSGI server like Gunicorn.
# Install gunicorn pip install gunicorn # Run your Flask app with gunicorn gunicorn app:app --bind 0.0.0.0:8000 --workers 4 # With the application factory pattern gunicorn "app:create_app()" --bind 0.0.0.0:8000 --workers 4
The --workers 4 flag spawns 4 worker processes to handle requests concurrently. A common rule of thumb is (2 * CPU cores) + 1 workers.
In production, Flask applications typically run behind a reverse proxy:
Client → Nginx (reverse proxy, SSL, static files) → Gunicorn (WSGI server) → Flask (application)
Nginx handles SSL termination, static file serving, and load balancing. Gunicorn runs your Flask application. This separation of concerns gives you better performance, security, and reliability than any single component could provide alone.
This is the most dangerous mistake you can make with Flask. The interactive debugger that runs in debug mode allows anyone who can reach your error page to execute arbitrary Python code on your server. This means full remote code execution — they can read your environment variables, access your database, and compromise your entire system.
# NEVER do this in production
app.run(debug=True)
# Instead, control debug mode via environment variables
import os
app.run(debug=os.environ.get("FLASK_DEBUG", "0") == "1")
This is the most common structural issue in Flask applications. It happens when your application module and your routes module try to import each other.
# BAD: Circular import
# app.py
from flask import Flask
app = Flask(__name__)
from routes import * # routes.py imports app from here = circular
# GOOD: Use the application factory pattern
# app/__init__.py
def create_app():
app = Flask(__name__)
from app.routes import main_bp
app.register_blueprint(main_bp)
return app
The application factory pattern solves circular imports because the app object is created inside a function, not at module level. Blueprints do not need to import the app object directly.
Hardcoding database URLs, secret keys, and API keys directly in your source code is a security risk and makes deployment difficult.
# BAD app.config["SECRET_KEY"] = "my-secret-key" app.config["DATABASE_URL"] = "postgresql://user:pass@localhost/db" # GOOD app.config["SECRET_KEY"] = os.environ["SECRET_KEY"] app.config["DATABASE_URL"] = os.environ["DATABASE_URL"]
Hardcoding URLs in templates and Python code breaks when you change route paths or deploy your app under a different prefix.
<!-- BAD -->
<a href="/about">About</a>
<link rel="stylesheet" href="/static/css/style.css">
<!-- GOOD -->
<a href="{{ url_for('about') }}">About</a>
<link rel="stylesheet" href="{{ url_for('static', filename='css/style.css') }}">
Some Flask operations require an application context. If you see “RuntimeError: Working outside of application context,” it means you are trying to use Flask features (like url_for() or current_app) outside of a request or without an active app context.
# If you need app context outside a request (e.g., in a script or test)
with app.app_context():
url = url_for("index")
db.create_all()
app object into a factory later is painful. Start with create_app() and save yourself the trouble.app object because it makes the transition to multiple blueprints trivial.python-dotenv. Your .env file should be in .gitignore.logging module. Configure it to write to files or a centralized logging service in production. app.logger is your starting point.import logging
from logging.handlers import RotatingFileHandler
if not app.debug:
handler = RotatingFileHandler("app.log", maxBytes=10240, backupCount=10)
handler.setLevel(logging.INFO)
formatter = logging.Formatter(
"%(asctime)s %(levelname)s: %(message)s [in %(pathname)s:%(lineno)d]"
)
handler.setFormatter(formatter)
app.logger.addHandler(handler)
app.logger.setLevel(logging.INFO)
app.logger.info("Application startup")
import pytest
from app import create_app
@pytest.fixture
def client():
app = create_app("testing")
with app.test_client() as client:
yield client
def test_index(client):
response = client.get("/")
assert response.status_code == 200
def test_404(client):
response = client.get("/nonexistent")
assert response.status_code == 404
pip freeze > requirements.txt so every environment runs the same package versions.pip install flask and freeze your dependencies with pip freeze > requirements.txt.flask --app app run --debug to start the development server with auto-reloading and an interactive debugger.create_app()) for any non-trivial application. It prevents circular imports and enables testing with different configurations.python-dotenv. Never hardcode secrets in source code.@app.route(). Use variable rules for dynamic URLs and url_for() to generate URLs programmatically.request object provides access to query parameters, form data, JSON payloads, and file uploads. Use jsonify() and make_response() to build responses.url_for() in templates to reference routes and static files.@app.errorhandler() to show branded error pages instead of browser defaults.Routing and templating are the two pillars of every Flask application. Routes map incoming HTTP requests to Python functions, and templates turn raw data into HTML that browsers can render. If you understand these two systems deeply, you can build anything from a single-page prototype to a production web application with dozens of endpoints.
Flask’s routing system is powered by Werkzeug, a battle-tested WSGI toolkit, while its templating engine is Jinja2, one of the most capable template engines in the Python ecosystem. Together they give you explicit control over every URL, every HTTP method, and every piece of markup your application serves.
This tutorial assumes you have Flask installed and a basic understanding of Python. We will start with route fundamentals, work through request and response handling, take a deep dive into Jinja2, and finish by building a complete Task Manager application that ties everything together.
A route in Flask is a mapping between a URL pattern and a Python function. You declare routes with the @app.route() decorator.
from flask import Flask
app = Flask(__name__)
@app.route('/')
def index():
return 'Home Page'
@app.route('/about')
def about():
return 'About Page'
if __name__ == '__main__':
app.run(debug=True)
When Python loads your module, each @app.route() decorator calls app.add_url_rule() under the hood. This registers the URL pattern, the endpoint name (defaults to the function name), and the view function in Flask’s URL map. You can inspect registered routes at any time:
# Print all registered routes
for rule in app.url_map.iter_rules():
print(f'{rule.endpoint}: {rule.rule} [{", ".join(rule.methods)}]')
Flask distinguishes between routes with and without trailing slashes, and the behavior is intentional:
# With trailing slash - acts like a directory
# /projects -> redirects to /projects/
# /projects/ -> 200 OK
@app.route('/projects/')
def projects():
return 'Projects List'
# Without trailing slash - acts like a file
# /about -> 200 OK
# /about/ -> 404 Not Found
@app.route('/about')
def about():
return 'About Page'
When a route is defined with a trailing slash and a user visits the URL without it, Flask issues a 301 redirect to the version with the slash. When a route is defined without a trailing slash and a user visits with one, Flask returns a 404. This is consistent with how web servers handle directories and files. My recommendation: use trailing slashes for collection endpoints and no trailing slash for individual resource endpoints.
You can also register routes explicitly using add_url_rule(). This is useful when the view function is defined elsewhere or generated dynamically:
def health_check():
return 'OK', 200
app.add_url_rule('/health', endpoint='health', view_func=health_check)
By default, a route only responds to GET requests. To handle other HTTP methods, pass them explicitly via the methods parameter.
from flask import Flask, request, jsonify
app = Flask(__name__)
tasks = []
# GET - Retrieve all tasks
@app.route('/api/tasks', methods=['GET'])
def get_tasks():
return jsonify(tasks)
# POST - Create a new task
@app.route('/api/tasks', methods=['POST'])
def create_task():
data = request.get_json()
task = {
'id': len(tasks) + 1,
'title': data['title'],
'done': False
}
tasks.append(task)
return jsonify(task), 201
# PUT - Update an existing task
@app.route('/api/tasks/<int:task_id>', methods=['PUT'])
def update_task(task_id):
task = next((t for t in tasks if t['id'] == task_id), None)
if task is None:
return jsonify({'error': 'Task not found'}), 404
data = request.get_json()
task['title'] = data.get('title', task['title'])
task['done'] = data.get('done', task['done'])
return jsonify(task)
# DELETE - Remove a task
@app.route('/api/tasks/<int:task_id>', methods=['DELETE'])
def delete_task(task_id):
global tasks
tasks = [t for t in tasks if t['id'] != task_id]
return '', 204
Sometimes it makes sense to handle multiple methods in a single view function. Use request.method to branch logic:
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
username = request.form['username']
password = request.form['password']
# authenticate user
return redirect(url_for('dashboard'))
# GET request - show login form
return render_template('login.html')
URL variables let you capture dynamic segments from the URL and pass them as arguments to your view function. Flask supports several built-in converters.
# Matches any text without slashes
@app.route('/user/<username>')
def user_profile(username):
return f'Profile: {username}'
# /user/folau -> username = "folau"
# /user/jane -> username = "jane"
# Only matches integers, rejects non-numeric values with 404
@app.route('/post/<int:post_id>')
def show_post(post_id):
return f'Post #{post_id}'
# /post/42 -> post_id = 42 (200 OK)
# /post/abc -> 404 Not Found
@app.route('/price/<float:amount>')
def show_price(amount):
return f'Price: ${amount:.2f}'
# /price/19.99 -> amount = 19.99
# /price/100 -> 404 (integers don't match float)
# Matches text including slashes - useful for file paths
@app.route('/files/<path:filepath>')
def serve_file(filepath):
return f'File: {filepath}'
# /files/docs/readme.txt -> filepath = "docs/readme.txt"
import uuid
@app.route('/transaction/<uuid:transaction_id>')
def show_transaction(transaction_id):
return f'Transaction: {transaction_id}'
# /transaction/a8098c1a-f86e-11da-bd1a-00112444be1e -> matches
@app.route('/blog/<int:year>/<int:month>/<slug>')
def blog_post(year, month, slug):
return f'Post: {slug} ({year}-{month:02d})'
# /blog/2026/02/flask-routes -> year=2026, month=2, slug="flask-routes"
Hard-coding URLs in your application is fragile. If you rename a route or change a URL pattern, every hard-coded reference breaks. The url_for() function solves this by generating URLs from endpoint names.
from flask import Flask, url_for
app = Flask(__name__)
@app.route('/')
def index():
return 'Home'
@app.route('/user/<username>')
def user_profile(username):
return f'Profile: {username}'
@app.route('/post/<int:post_id>')
def show_post(post_id):
return f'Post #{post_id}'
with app.test_request_context():
print(url_for('index')) # /
print(url_for('user_profile', username='folau')) # /user/folau
print(url_for('show_post', post_id=42)) # /post/42
print(url_for('show_post', post_id=42, highlight='true'))
# /post/42?highlight=true (extra kwargs become query params)
print(url_for('static', filename='css/style.css'))
# /static/css/style.css
There are three reasons to always use url_for() instead of hard-coded paths:
url_for() correctly prefixes the blueprint’s URL prefix without you having to remember it.<!-- In Jinja2 templates -->
<a href="{{ url_for('index') }}">Home</a>
<a href="{{ url_for('user_profile', username='folau') }}">My Profile</a>
<link rel="stylesheet" href="{{ url_for('static', filename='css/style.css') }}">
As your application grows, keeping every route in a single file becomes unmanageable. Blueprints let you organize routes into logical modules, each with its own URL prefix, templates, and static files.
# blueprints/auth.py
from flask import Blueprint, render_template, request, redirect, url_for
auth_bp = Blueprint('auth', __name__, template_folder='templates')
@auth_bp.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'POST':
# handle login
return redirect(url_for('auth.dashboard'))
return render_template('auth/login.html')
@auth_bp.route('/logout')
def logout():
# handle logout
return redirect(url_for('auth.login'))
@auth_bp.route('/dashboard')
def dashboard():
return render_template('auth/dashboard.html')
# app.py from flask import Flask from blueprints.auth import auth_bp from blueprints.tasks import tasks_bp from blueprints.api import api_bp app = Flask(__name__) # Register with URL prefixes app.register_blueprint(auth_bp, url_prefix='/auth') app.register_blueprint(tasks_bp, url_prefix='/tasks') app.register_blueprint(api_bp, url_prefix='/api/v1') # Routes become: # /auth/login, /auth/logout, /auth/dashboard # /tasks/, /tasks/create, /tasks/1/edit # /api/v1/tasks, /api/v1/tasks/1
# Reference a blueprint endpoint with 'blueprint_name.endpoint'
url_for('auth.login') # /auth/login
url_for('tasks.create') # /tasks/create
url_for('api.get_tasks') # /api/v1/tasks
# Within the same blueprint, use a dot prefix for the current blueprint
url_for('.login') # resolves to current blueprint's login
myapp/
├── app.py
├── config.py
├── blueprints/
│ ├── __init__.py
│ ├── auth/
│ │ ├── __init__.py
│ │ ├── routes.py
│ │ └── templates/
│ │ └── auth/
│ │ ├── login.html
│ │ └── register.html
│ └── tasks/
│ ├── __init__.py
│ ├── routes.py
│ └── templates/
│ └── tasks/
│ ├── list.html
│ └── detail.html
├── templates/
│ └── base.html
└── static/
├── css/
├── js/
└── img/
Flask provides the request object as a context-local proxy. It gives you access to everything the client sent: query parameters, form data, JSON payloads, files, headers, and cookies.
from flask import request
# URL: /search?q=flask&page=2&limit=20
@app.route('/search')
def search():
query = request.args.get('q', '') # 'flask'
page = request.args.get('page', 1, type=int) # 2
limit = request.args.get('limit', 10, type=int) # 20
results = perform_search(query, page, limit)
return render_template('search.html', results=results, query=query)
# Handles application/x-www-form-urlencoded and multipart/form-data
@app.route('/register', methods=['POST'])
def register():
username = request.form['username'] # raises 400 if missing
email = request.form.get('email', '') # returns '' if missing
password = request.form['password']
# validate and create user
return redirect(url_for('login'))
# Handles application/json content type
@app.route('/api/tasks', methods=['POST'])
def create_task():
data = request.get_json() # returns None if parsing fails
if data is None:
return jsonify({'error': 'Invalid JSON'}), 400
title = data.get('title')
if not title:
return jsonify({'error': 'Title is required'}), 400
task = {'id': 1, 'title': title, 'done': False}
return jsonify(task), 201
import os
from werkzeug.utils import secure_filename
UPLOAD_FOLDER = 'uploads'
ALLOWED_EXTENSIONS = {'png', 'jpg', 'jpeg', 'gif', 'pdf'}
app.config['UPLOAD_FOLDER'] = UPLOAD_FOLDER
app.config['MAX_CONTENT_LENGTH'] = 16 * 1024 * 1024 # 16 MB limit
def allowed_file(filename):
return '.' in filename and \
filename.rsplit('.', 1)[1].lower() in ALLOWED_EXTENSIONS
@app.route('/upload', methods=['POST'])
def upload_file():
if 'file' not in request.files:
return 'No file part', 400
file = request.files['file']
if file.filename == '':
return 'No selected file', 400
if file and allowed_file(file.filename):
filename = secure_filename(file.filename)
file.save(os.path.join(app.config['UPLOAD_FOLDER'], filename))
return f'File {filename} uploaded successfully', 201
return 'File type not allowed', 400
@app.route('/debug')
def debug_request():
info = {
'method': request.method, # GET, POST, etc.
'url': request.url, # full URL
'path': request.path, # /debug
'host': request.host, # localhost:5000
'content_type': request.content_type,
'user_agent': str(request.user_agent),
'remote_addr': request.remote_addr, # client IP
'is_json': request.is_json,
'cookies': dict(request.cookies),
'headers': dict(request.headers),
}
return jsonify(info)
Flask gives you multiple ways to construct HTTP responses, from simple strings to fully customized response objects.
@app.route('/')
def index():
return 'Hello, World!' # 200 OK, text/html
# (body, status_code)
@app.route('/not-found')
def not_found():
return 'Resource not found', 404
# (body, status_code, headers)
@app.route('/custom')
def custom():
return 'Custom Response', 200, {'X-Custom-Header': 'flask-tutorial'}
# (body, headers)
@app.route('/no-cache')
def no_cache():
return 'Fresh data', {'Cache-Control': 'no-store'}
from flask import make_response
@app.route('/cookie')
def set_cookie():
resp = make_response('Cookie has been set')
resp.set_cookie('username', 'folau', max_age=3600)
resp.headers['X-Custom'] = 'value'
return resp
@app.route('/download')
def download_csv():
csv_data = 'name,email\nFolau,folau@example.com\n'
resp = make_response(csv_data)
resp.headers['Content-Type'] = 'text/csv'
resp.headers['Content-Disposition'] = 'attachment; filename=users.csv'
return resp
from flask import jsonify
@app.route('/api/status')
def api_status():
return jsonify({
'status': 'healthy',
'version': '1.0.0',
'uptime': 99.9
})
# Returns application/json with proper Content-Type header
# With status code
@app.route('/api/error')
def api_error():
return jsonify({'error': 'Something went wrong'}), 500
from flask import redirect, url_for, abort
@app.route('/old-page')
def old_page():
return redirect(url_for('new_page')) # 302 redirect
@app.route('/old-permanent')
def old_permanent():
return redirect(url_for('new_page'), code=301) # 301 permanent
@app.route('/admin')
def admin():
if not current_user.is_admin:
abort(403) # Forbidden - raises HTTPException
return render_template('admin.html')
# Custom error handlers
@app.errorhandler(404)
def page_not_found(e):
return render_template('404.html'), 404
@app.errorhandler(500)
def internal_error(e):
return render_template('500.html'), 500
Jinja2 is Flask’s default template engine. It provides template inheritance, control structures, filters, macros, and more. Templates live in the templates/ directory by default.
Template inheritance is Jinja2’s most powerful feature. You define a base template with blocks that child templates can override.
<!-- templates/base.html -->
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>{% block title %}My App{% endblock %}</title>
<link rel="stylesheet" href="{{ url_for('static', filename='css/style.css') }}">
{% block extra_css %}{% endblock %}
</head>
<body>
<nav>
{% block navbar %}
<a href="{{ url_for('index') }}">Home</a>
<a href="{{ url_for('tasks.list') }}">Tasks</a>
<a href="{{ url_for('auth.login') }}">Login</a>
{% endblock %}
</nav>
<main class="container">
{% with messages = get_flashed_messages(with_categories=true) %}
{% if messages %}
{% for category, message in messages %}
<div class="alert alert-{{ category }}">{{ message }}</div>
{% endfor %}
{% endif %}
{% endwith %}
{% block content %}{% endblock %}
</main>
<footer>
{% block footer %}
<p>© 2026 My App</p>
{% endblock %}
</footer>
<script src="{{ url_for('static', filename='js/main.js') }}"></script>
{% block extra_js %}{% endblock %}
</body>
</html>
<!-- templates/tasks/list.html -->
{% extends "base.html" %}
{% block title %}Tasks - My App{% endblock %}
{% block content %}
<h1>Task List</h1>
{% if tasks %}
<ul>
{% for task in tasks %}
<li>
<a href="{{ url_for('tasks.detail', task_id=task.id) }}">
{{ task.title }}
</a>
{% if task.done %}
<span class="badge">Done</span>
{% endif %}
</li>
{% endfor %}
</ul>
{% else %}
<p>No tasks yet. <a href="{{ url_for('tasks.create') }}">Create one</a>.</p>
{% endif %}
{% endblock %}
<!-- Simple variable output -->
<h1>{{ user.name }}</h1>
<p>Email: {{ user.email }}</p>
<!-- Expressions -->
<p>Total: ${{ price * quantity }}</p>
<p>Full Name: {{ first_name ~ ' ' ~ last_name }}</p>
<!-- Default values -->
<p>Welcome, {{ username | default('Guest') }}</p>
<!-- Accessing dictionaries and lists -->
<p>{{ config['DEBUG'] }}</p>
<p>{{ items[0] }}</p>
<!-- Ternary-style conditional expression -->
<p class="{{ 'active' if is_active else 'inactive' }}">Status</p>
<!-- If / Elif / Else -->
{% if user.role == 'admin' %}
<a href="/admin">Admin Panel</a>
{% elif user.role == 'editor' %}
<a href="/editor">Editor Panel</a>
{% else %}
<a href="/profile">My Profile</a>
{% endif %}
<!-- For Loop -->
<table>
<thead>
<tr><th>#</th><th>Name</th><th>Email</th></tr>
</thead>
<tbody>
{% for user in users %}
<tr class="{{ 'even' if loop.index is even else 'odd' }}">
<td>{{ loop.index }}</td>
<td>{{ user.name }}</td>
<td>{{ user.email }}</td>
</tr>
{% else %}
<tr><td colspan="3">No users found.</td></tr>
{% endfor %}
</tbody>
</table>
<!-- Loop Variable Properties -->
{# loop.index - current iteration (1-indexed) #}
{# loop.index0 - current iteration (0-indexed) #}
{# loop.first - True if first iteration #}
{# loop.last - True if last iteration #}
{# loop.length - total number of items #}
{# loop.revindex - iterations until the end (1-indexed) #}
Filters transform variable output. They are applied with the pipe (|) operator.
<!-- String filters -->
{{ name | capitalize }} <!-- "folau" -> "Folau" -->
{{ name | upper }} <!-- "folau" -> "FOLAU" -->
{{ name | lower }} <!-- "FOLAU" -> "folau" -->
{{ name | title }} <!-- "hello world" -> "Hello World" -->
{{ text | truncate(100) }} <!-- Truncates to 100 chars with "..." -->
{{ text | striptags }} <!-- Removes HTML tags -->
{{ text | wordcount }} <!-- Counts words -->
<!-- HTML escaping -->
{{ user_input | e }} <!-- Escapes HTML (default behavior) -->
{{ trusted_html | safe }} <!-- Marks content as safe, no escaping -->
<!-- List/Collection filters -->
{{ users | length }} <!-- Number of items -->
{{ numbers | sum }} <!-- Sum of numeric list -->
{{ names | join(', ') }} <!-- Joins list items -->
{{ items | sort }} <!-- Sorts items -->
{{ users | sort(attribute='name') }} <!-- Sort by attribute -->
{{ items | first }} <!-- First item -->
{{ items | last }} <!-- Last item -->
{{ items | unique }} <!-- Unique items -->
{{ items | reverse }} <!-- Reversed list -->
<!-- Number filters -->
{{ price | round(2) }} <!-- Rounds to 2 decimal places -->
{{ large_num | filesizeformat }} <!-- 13312 -> "13.0 kB" -->
<!-- Default values -->
{{ bio | default('No bio provided') }}
<!-- Chaining filters -->
{{ description | striptags | truncate(200) | capitalize }}
# Register custom filters in your Flask app
from datetime import datetime
@app.template_filter('dateformat')
def dateformat_filter(value, format='%B %d, %Y'):
if isinstance(value, str):
value = datetime.fromisoformat(value)
return value.strftime(format)
@app.template_filter('currency')
def currency_filter(value):
return f'${value:,.2f}'
<!-- Using custom filters -->
<p>Created: {{ task.created_at | dateformat }}</p>
<p>Created: {{ task.created_at | dateformat('%m/%d/%Y') }}</p>
<p>Price: {{ product.price | currency }}</p>
Macros are like functions for templates. They let you create reusable HTML components.
<!-- templates/macros/forms.html -->
{% macro input(name, label, type='text', value='', required=false, placeholder='') %}
<div class="form-group">
<label for="{{ name }}">{{ label }}</label>
<input type="{{ type }}"
id="{{ name }}"
name="{{ name }}"
value="{{ value }}"
placeholder="{{ placeholder }}"
class="form-control"
{{ 'required' if required }}>
</div>
{% endmacro %}
{% macro textarea(name, label, value='', rows=4, required=false) %}
<div class="form-group">
<label for="{{ name }}">{{ label }}</label>
<textarea id="{{ name }}"
name="{{ name }}"
rows="{{ rows }}"
class="form-control"
{{ 'required' if required }}>{{ value }}</textarea>
</div>
{% endmacro %}
{% macro select(name, label, options, selected='') %}
<div class="form-group">
<label for="{{ name }}">{{ label }}</label>
<select id="{{ name }}" name="{{ name }}" class="form-control">
{% for value, text in options %}
<option value="{{ value }}" {{ 'selected' if value == selected }}>
{{ text }}
</option>
{% endfor %}
</select>
</div>
{% endmacro %}
<!-- Using macros in another template -->
{% from "macros/forms.html" import input, textarea, select %}
<form method="POST">
{{ input('title', 'Task Title', required=true, placeholder='Enter task title') }}
{{ textarea('description', 'Description', rows=6) }}
{{ select('priority', 'Priority', [('low', 'Low'), ('medium', 'Medium'), ('high', 'High')]) }}
<button type="submit" class="btn btn-primary">Create Task</button>
</form>
<!-- templates/partials/task_card.html -->
<div class="card">
<div class="card-body">
<h5>{{ task.title }}</h5>
<p>{{ task.description | truncate(100) }}</p>
<span class="badge badge-{{ 'success' if task.done else 'warning' }}">
{{ 'Done' if task.done else 'Pending' }}
</span>
</div>
</div>
<!-- Including the partial -->
{% for task in tasks %}
{% include "partials/task_card.html" %}
{% endfor %}
<!-- Include with ignore missing (won't error if file is absent) -->
{% include "partials/sidebar.html" ignore missing %}
HTML forms are the standard way to collect user input on the web. Flask gives you direct access to submitted form data through the request object.
from flask import Flask, render_template, request, redirect, url_for, flash
app = Flask(__name__)
app.secret_key = 'your-secret-key-here' # required for flash messages
tasks = []
@app.route('/tasks/create', methods=['GET', 'POST'])
def create_task():
if request.method == 'POST':
title = request.form.get('title', '').strip()
description = request.form.get('description', '').strip()
priority = request.form.get('priority', 'medium')
# Validation
errors = []
if not title:
errors.append('Title is required.')
if len(title) > 200:
errors.append('Title must be under 200 characters.')
if errors:
for error in errors:
flash(error, 'danger')
return render_template('tasks/create.html',
title=title,
description=description,
priority=priority)
task = {
'id': len(tasks) + 1,
'title': title,
'description': description,
'priority': priority,
'done': False
}
tasks.append(task)
flash('Task created successfully!', 'success')
return redirect(url_for('task_list'))
return render_template('tasks/create.html')
<!-- templates/tasks/create.html -->
{% extends "base.html" %}
{% block content %}
<h1>Create New Task</h1>
<form method="POST" action="{{ url_for('create_task') }}">
<div class="form-group">
<label for="title">Title</label>
<input type="text" id="title" name="title"
value="{{ title | default('') }}"
class="form-control" required>
</div>
<div class="form-group">
<label for="description">Description</label>
<textarea id="description" name="description"
class="form-control" rows="4">{{ description | default('') }}</textarea>
</div>
<div class="form-group">
<label for="priority">Priority</label>
<select id="priority" name="priority" class="form-control">
<option value="low" {{ 'selected' if priority == 'low' }}>Low</option>
<option value="medium" {{ 'selected' if priority == 'medium' else 'selected' if not priority }}>Medium</option>
<option value="high" {{ 'selected' if priority == 'high' }}>High</option>
</select>
</div>
<button type="submit" class="btn btn-primary">Create Task</button>
<a href="{{ url_for('task_list') }}" class="btn btn-secondary">Cancel</a>
</form>
{% endblock %}
# In your view function
flash('Operation successful!', 'success') # green
flash('Please check your input.', 'warning') # yellow
flash('Something went wrong.', 'danger') # red
flash('FYI: system update tonight.', 'info') # blue
<!-- In your base template -->
{% with messages = get_flashed_messages(with_categories=true) %}
{% if messages %}
{% for category, message in messages %}
<div class="alert alert-{{ category }} alert-dismissible fade show">
{{ message }}
<button type="button" class="close" data-dismiss="alert">
<span>×</span>
</button>
</div>
{% endfor %}
{% endif %}
{% endwith %}
Flask does not include CSRF protection by default. The Flask-WTF extension is the standard way to add it:
pip install flask-wtf
from flask_wtf import FlaskForm, CSRFProtect
from wtforms import StringField, TextAreaField, SelectField
from wtforms.validators import DataRequired, Length
app = Flask(__name__)
app.secret_key = 'your-secret-key'
csrf = CSRFProtect(app)
class TaskForm(FlaskForm):
title = StringField('Title', validators=[
DataRequired(message='Title is required'),
Length(max=200, message='Title must be under 200 characters')
])
description = TextAreaField('Description')
priority = SelectField('Priority',
choices=[('low', 'Low'), ('medium', 'Medium'), ('high', 'High')])
@app.route('/tasks/create', methods=['GET', 'POST'])
def create_task():
form = TaskForm()
if form.validate_on_submit():
task = {
'title': form.title.data,
'description': form.description.data,
'priority': form.priority.data
}
tasks.append(task)
flash('Task created!', 'success')
return redirect(url_for('task_list'))
return render_template('tasks/create_wtf.html', form=form)
<!-- templates/tasks/create_wtf.html -->
{% extends "base.html" %}
{% block content %}
<form method="POST">
{{ form.hidden_tag() }} <!-- Includes CSRF token -->
<div class="form-group">
{{ form.title.label }}
{{ form.title(class="form-control") }}
{% for error in form.title.errors %}
<small class="text-danger">{{ error }}</small>
{% endfor %}
</div>
<div class="form-group">
{{ form.description.label }}
{{ form.description(class="form-control", rows=4) }}
</div>
<div class="form-group">
{{ form.priority.label }}
{{ form.priority(class="form-control") }}
</div>
<button type="submit" class="btn btn-primary">Create</button>
</form>
{% endblock %}
Flask serves static files from the static/ folder by default. Use url_for('static', filename=...) to reference them.
<!-- CSS -->
<link rel="stylesheet" href="{{ url_for('static', filename='css/style.css') }}">
<!-- JavaScript -->
<script src="{{ url_for('static', filename='js/app.js') }}"></script>
<!-- Images -->
<img src="{{ url_for('static', filename='img/logo.png') }}" alt="Logo">
<!-- Favicon -->
<link rel="icon" href="{{ url_for('static', filename='favicon.ico') }}">
static/ ├── css/ │ ├── style.css │ └── components/ │ └── navbar.css ├── js/ │ ├── app.js │ └── utils.js ├── img/ │ ├── logo.png │ └── icons/ └── favicon.ico
Browsers aggressively cache static files. When you update a CSS or JS file, users might still see the old version. A cache-busting strategy appends a version or hash to the file URL.
import hashlib
import os
@app.template_filter('cache_bust')
def cache_bust_filter(filename):
filepath = os.path.join(app.static_folder, filename)
if os.path.isfile(filepath):
with open(filepath, 'rb') as f:
file_hash = hashlib.md5(f.read()).hexdigest()[:8]
return url_for('static', filename=filename) + '?v=' + file_hash
return url_for('static', filename=filename)
<!-- Usage in templates -->
<link rel="stylesheet" href="{{ 'css/style.css' | cache_bust }}">
<!-- Output: /static/css/style.css?v=a1b2c3d4 -->
Let us build a complete task manager application that demonstrates routes, templates, forms, and flash messages working together. This is the kind of application structure you would use in a real project.
# app.py
from flask import Flask, render_template, request, redirect, url_for, flash
from datetime import datetime
app = Flask(__name__)
app.secret_key = 'dev-secret-key-change-in-production'
# In-memory storage (use a database in production)
tasks = [
{
'id': 1,
'title': 'Learn Flask routing',
'description': 'Study URL rules, methods, and variables',
'priority': 'high',
'done': False,
'created_at': datetime(2026, 2, 1)
},
{
'id': 2,
'title': 'Master Jinja2 templates',
'description': 'Learn inheritance, macros, and filters',
'priority': 'high',
'done': False,
'created_at': datetime(2026, 2, 5)
},
{
'id': 3,
'title': 'Build a CRUD app',
'description': 'Apply routing and templates in a real project',
'priority': 'medium',
'done': False,
'created_at': datetime(2026, 2, 10)
}
]
next_id = 4
def find_task(task_id):
return next((t for t in tasks if t['id'] == task_id), None)
@app.template_filter('dateformat')
def dateformat_filter(value, fmt='%B %d, %Y'):
return value.strftime(fmt)
# --- Routes ---
@app.route('/')
def index():
return redirect(url_for('task_list'))
@app.route('/tasks/')
def task_list():
filter_status = request.args.get('status', 'all')
sort_by = request.args.get('sort', 'created_at')
filtered = tasks[:]
if filter_status == 'done':
filtered = [t for t in filtered if t['done']]
elif filter_status == 'pending':
filtered = [t for t in filtered if not t['done']]
if sort_by == 'priority':
priority_order = {'high': 0, 'medium': 1, 'low': 2}
filtered.sort(key=lambda t: priority_order.get(t['priority'], 1))
elif sort_by == 'title':
filtered.sort(key=lambda t: t['title'].lower())
else:
filtered.sort(key=lambda t: t['created_at'], reverse=True)
return render_template('tasks/list.html',
tasks=filtered,
current_status=filter_status,
current_sort=sort_by)
@app.route('/tasks/create', methods=['GET', 'POST'])
def task_create():
global next_id
if request.method == 'POST':
title = request.form.get('title', '').strip()
description = request.form.get('description', '').strip()
priority = request.form.get('priority', 'medium')
if not title:
flash('Title is required.', 'danger')
return render_template('tasks/form.html',
action='Create',
title=title,
description=description,
priority=priority)
task = {
'id': next_id,
'title': title,
'description': description,
'priority': priority,
'done': False,
'created_at': datetime.now()
}
tasks.append(task)
next_id += 1
flash(f'Task "{title}" created successfully!', 'success')
return redirect(url_for('task_list'))
return render_template('tasks/form.html', action='Create')
@app.route('/tasks/<int:task_id>')
def task_detail(task_id):
task = find_task(task_id)
if task is None:
flash('Task not found.', 'danger')
return redirect(url_for('task_list'))
return render_template('tasks/detail.html', task=task)
@app.route('/tasks/<int:task_id>/edit', methods=['GET', 'POST'])
def task_edit(task_id):
task = find_task(task_id)
if task is None:
flash('Task not found.', 'danger')
return redirect(url_for('task_list'))
if request.method == 'POST':
title = request.form.get('title', '').strip()
description = request.form.get('description', '').strip()
priority = request.form.get('priority', 'medium')
if not title:
flash('Title is required.', 'danger')
return render_template('tasks/form.html',
action='Edit',
task=task,
title=title,
description=description,
priority=priority)
task['title'] = title
task['description'] = description
task['priority'] = priority
flash(f'Task "{title}" updated successfully!', 'success')
return redirect(url_for('task_detail', task_id=task_id))
return render_template('tasks/form.html',
action='Edit',
task=task,
title=task['title'],
description=task['description'],
priority=task['priority'])
@app.route('/tasks/<int:task_id>/toggle', methods=['POST'])
def task_toggle(task_id):
task = find_task(task_id)
if task is None:
flash('Task not found.', 'danger')
return redirect(url_for('task_list'))
task['done'] = not task['done']
status = 'completed' if task['done'] else 'reopened'
flash(f'Task "{task["title"]}" {status}.', 'info')
return redirect(url_for('task_list'))
@app.route('/tasks/<int:task_id>/delete', methods=['POST'])
def task_delete(task_id):
global tasks
task = find_task(task_id)
if task is None:
flash('Task not found.', 'danger')
return redirect(url_for('task_list'))
tasks = [t for t in tasks if t['id'] != task_id]
flash(f'Task "{task["title"]}" deleted.', 'warning')
return redirect(url_for('task_list'))
# Custom error handlers
@app.errorhandler(404)
def not_found(e):
return render_template('errors/404.html'), 404
@app.errorhandler(500)
def server_error(e):
return render_template('errors/500.html'), 500
if __name__ == '__main__':
app.run(debug=True)
<!-- templates/base.html -->
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>{% block title %}Task Manager{% endblock %}</title>
<link rel="stylesheet"
href="https://cdn.jsdelivr.net/npm/bootstrap@4.6.2/dist/css/bootstrap.min.css">
<link rel="stylesheet"
href="{{ url_for('static', filename='css/style.css') }}">
</head>
<body>
<nav class="navbar navbar-expand-lg navbar-dark bg-dark">
<a class="navbar-brand" href="{{ url_for('task_list') }}">Task Manager</a>
<div class="navbar-nav">
<a class="nav-link" href="{{ url_for('task_list') }}">All Tasks</a>
<a class="nav-link" href="{{ url_for('task_create') }}">New Task</a>
</div>
</nav>
<div class="container mt-4">
{% with messages = get_flashed_messages(with_categories=true) %}
{% if messages %}
{% for category, message in messages %}
<div class="alert alert-{{ category }} alert-dismissible fade show">
{{ message }}
<button type="button" class="close" data-dismiss="alert">
<span>×</span>
</button>
</div>
{% endfor %}
{% endif %}
{% endwith %}
{% block content %}{% endblock %}
</div>
<script src="https://code.jquery.com/jquery-3.5.1.slim.min.js"></script>
<script src="https://cdn.jsdelivr.net/npm/bootstrap@4.6.2/dist/js/bootstrap.bundle.min.js"></script>
</body>
</html>
<!-- templates/tasks/list.html -->
{% extends "base.html" %}
{% block title %}All Tasks - Task Manager{% endblock %}
{% block content %}
<div class="d-flex justify-content-between align-items-center mb-4">
<h1>Tasks</h1>
<a href="{{ url_for('task_create') }}" class="btn btn-primary">+ New Task</a>
</div>
<!-- Filters -->
<div class="btn-group mb-3">
<a href="{{ url_for('task_list', status='all', sort=current_sort) }}"
class="btn btn-sm {{ 'btn-primary' if current_status == 'all' else 'btn-outline-primary' }}">
All
</a>
<a href="{{ url_for('task_list', status='pending', sort=current_sort) }}"
class="btn btn-sm {{ 'btn-primary' if current_status == 'pending' else 'btn-outline-primary' }}">
Pending
</a>
<a href="{{ url_for('task_list', status='done', sort=current_sort) }}"
class="btn btn-sm {{ 'btn-primary' if current_status == 'done' else 'btn-outline-primary' }}">
Done
</a>
</div>
<!-- Sort -->
<div class="btn-group mb-3 ml-2">
<a href="{{ url_for('task_list', status=current_status, sort='created_at') }}"
class="btn btn-sm {{ 'btn-secondary' if current_sort == 'created_at' else 'btn-outline-secondary' }}">
Newest
</a>
<a href="{{ url_for('task_list', status=current_status, sort='priority') }}"
class="btn btn-sm {{ 'btn-secondary' if current_sort == 'priority' else 'btn-outline-secondary' }}">
Priority
</a>
<a href="{{ url_for('task_list', status=current_status, sort='title') }}"
class="btn btn-sm {{ 'btn-secondary' if current_sort == 'title' else 'btn-outline-secondary' }}">
A-Z
</a>
</div>
{% if tasks %}
<div class="list-group">
{% for task in tasks %}
<div class="list-group-item d-flex justify-content-between align-items-start">
<div>
<h5 class="{{ 'text-muted' if task.done }}">
{% if task.done %}<s>{{ task.title }}</s>{% else %}{{ task.title }}{% endif %}
<span class="badge badge-{{ 'danger' if task.priority == 'high' else 'warning' if task.priority == 'medium' else 'info' }}">
{{ task.priority | capitalize }}
</span>
</h5>
<small class="text-muted">Created {{ task.created_at | dateformat }}</small>
</div>
<div class="btn-group">
<a href="{{ url_for('task_detail', task_id=task.id) }}"
class="btn btn-sm btn-outline-info">View</a>
<a href="{{ url_for('task_edit', task_id=task.id) }}"
class="btn btn-sm btn-outline-secondary">Edit</a>
<form method="POST" action="{{ url_for('task_toggle', task_id=task.id) }}"
style="display:inline">
<button class="btn btn-sm btn-outline-{{ 'success' if not task.done else 'warning' }}">
{{ 'Complete' if not task.done else 'Reopen' }}
</button>
</form>
<form method="POST" action="{{ url_for('task_delete', task_id=task.id) }}"
style="display:inline"
onsubmit="return confirm('Delete this task?')">
<button class="btn btn-sm btn-outline-danger">Delete</button>
</form>
</div>
</div>
{% endfor %}
</div>
<p class="mt-3 text-muted">Showing {{ tasks | length }} task{{ 's' if tasks | length != 1 }}</p>
{% else %}
<div class="text-center py-5">
<h3 class="text-muted">No tasks found</h3>
<p><a href="{{ url_for('task_create') }}">Create your first task</a></p>
</div>
{% endif %}
{% endblock %}
<!-- templates/tasks/form.html -->
{% extends "base.html" %}
{% block title %}{{ action }} Task - Task Manager{% endblock %}
{% block content %}
<h1>{{ action }} Task</h1>
<form method="POST">
<div class="form-group">
<label for="title">Title <span class="text-danger">*</span></label>
<input type="text" id="title" name="title"
value="{{ title | default('') }}"
class="form-control" required maxlength="200"
placeholder="What needs to be done?">
</div>
<div class="form-group">
<label for="description">Description</label>
<textarea id="description" name="description"
class="form-control" rows="4"
placeholder="Add details (optional)">{{ description | default('') }}</textarea>
</div>
<div class="form-group">
<label for="priority">Priority</label>
<select id="priority" name="priority" class="form-control">
<option value="low" {{ 'selected' if priority == 'low' }}>Low</option>
<option value="medium" {{ 'selected' if priority == 'medium' or not priority }}>Medium</option>
<option value="high" {{ 'selected' if priority == 'high' }}>High</option>
</select>
</div>
<button type="submit" class="btn btn-primary">{{ action }} Task</button>
<a href="{{ url_for('task_list') }}" class="btn btn-secondary">Cancel</a>
</form>
{% endblock %}
<!-- templates/tasks/detail.html -->
{% extends "base.html" %}
{% block title %}{{ task.title }} - Task Manager{% endblock %}
{% block content %}
<div class="card">
<div class="card-body">
<h2>
{{ task.title }}
<span class="badge badge-{{ 'success' if task.done else 'warning' }}">
{{ 'Done' if task.done else 'Pending' }}
</span>
<span class="badge badge-{{ 'danger' if task.priority == 'high' else 'warning' if task.priority == 'medium' else 'info' }}">
{{ task.priority | capitalize }} Priority
</span>
</h2>
{% if task.description %}
<p class="card-text mt-3">{{ task.description }}</p>
{% endif %}
<p class="text-muted">Created: {{ task.created_at | dateformat }}</p>
<div class="mt-3">
<a href="{{ url_for('task_edit', task_id=task.id) }}"
class="btn btn-secondary">Edit</a>
<form method="POST" action="{{ url_for('task_toggle', task_id=task.id) }}"
style="display:inline">
<button class="btn btn-{{ 'success' if not task.done else 'warning' }}">
Mark {{ 'Complete' if not task.done else 'Incomplete' }}
</button>
</form>
<form method="POST" action="{{ url_for('task_delete', task_id=task.id) }}"
style="display:inline"
onsubmit="return confirm('Are you sure you want to delete this task?')">
<button class="btn btn-danger">Delete</button>
</form>
</div>
</div>
</div>
<a href="{{ url_for('task_list') }}" class="btn btn-link mt-3">← Back to list</a>
{% endblock %}
# Install Flask pip install flask # Run the application python app.py # Or use the flask command export FLASK_APP=app.py export FLASK_ENV=development flask run # Open http://localhost:5000 in your browser
Here are the mistakes I see developers make most often when working with Flask routes and templates.
# WRONG - This only handles GET, POST will return 405 Method Not Allowed
@app.route('/submit')
def submit():
data = request.form['name'] # never reached on POST
return 'OK'
# CORRECT
@app.route('/submit', methods=['GET', 'POST'])
def submit():
if request.method == 'POST':
data = request.form['name']
return 'OK'
return render_template('form.html')
# Flask looks for templates in the 'templates/' folder by default.
# Make sure your directory structure is correct:
#
# myapp/
# ├── app.py
# └── templates/ <-- must be named "templates"
# └── index.html
# WRONG - file at myapp/index.html (not in templates folder)
# CORRECT - file at myapp/templates/index.html
# If using blueprints with custom template folders, check template_folder parameter:
bp = Blueprint('auth', __name__, template_folder='templates')
# Template paths are relative: render_template('auth/login.html')
# WRONG - using the URL path instead of the function name
url_for('/user/profile')
# CORRECT - use the endpoint (function name)
url_for('user_profile')
# WRONG - missing required URL variables
url_for('show_post') # BuildError: missing 'post_id'
# CORRECT
url_for('show_post', post_id=42)
# WRONG - blueprint endpoint without blueprint prefix
url_for('login') # BuildError if login is in auth blueprint
# CORRECT
url_for('auth.login')
# This is a common problem with Flask blueprints.
# WRONG:
# app.py imports blueprints/auth.py
# blueprints/auth.py imports app from app.py -> circular import
# CORRECT - use Flask application factory pattern:
# app/__init__.py
def create_app():
app = Flask(__name__)
from .blueprints.auth import auth_bp
app.register_blueprint(auth_bp)
return app
# WRONG - code after return never executes
@app.route('/example')
def example():
return 'Hello'
response.headers['X-Custom'] = 'value' # unreachable
# CORRECT - use make_response before returning
@app.route('/example')
def example():
resp = make_response('Hello')
resp.headers['X-Custom'] = 'value'
return resp
# Use nouns, not verbs. Let HTTP methods express the action.
# WRONG
@app.route('/get-users')
@app.route('/create-user')
@app.route('/delete-user/1')
# CORRECT
@app.route('/users/', methods=['GET']) # list users
@app.route('/users/', methods=['POST']) # create user
@app.route('/users/<int:id>', methods=['GET']) # get user
@app.route('/users/<int:id>', methods=['PUT']) # update user
@app.route('/users/<int:id>', methods=['DELETE']) # delete user
# Use plural nouns for collections
# WRONG: /user/1
# CORRECT: /users/1
# Nest resources logically
# /users/1/posts - posts by user 1
# /users/1/posts/5 - post 5 by user 1
# WRONG - business logic crammed into the view
@app.route('/checkout', methods=['POST'])
def checkout():
cart = session.get('cart', [])
total = sum(item['price'] * item['qty'] for item in cart)
tax = total * 0.08
shipping = 5.99 if total < 50 else 0
final_total = total + tax + shipping
# ... 50 more lines of order processing
return render_template('receipt.html', total=final_total)
# CORRECT - delegate to service functions
@app.route('/checkout', methods=['POST'])
def checkout():
cart = get_cart_from_session()
order = create_order(cart)
return render_template('receipt.html', order=order)
Do not wait until your app.py is 2000 lines long. Start with blueprints as soon as you have more than one logical section (auth, API, admin, public pages). The refactoring cost grows with time.
# Organize templates to mirror your application structure
templates/
├── base.html # Master layout
├── macros/
│ ├── forms.html # Reusable form components
│ └── pagination.html # Pagination macro
├── partials/
│ ├── navbar.html # Navigation
│ ├── footer.html # Footer
│ └── flash_messages.html
├── auth/
│ ├── login.html
│ └── register.html
├── tasks/
│ ├── list.html
│ ├── detail.html
│ └── form.html
└── errors/
├── 404.html
└── 500.html
app.secret_key to a long random value in production.secure_filename() from Werkzeug for file uploads.MAX_CONTENT_LENGTH to prevent large file upload attacks.| safe on content you trust.@app.route() with explicit methods parameters. Understand trailing slash behavior.int, float, path, uuid) to validate input at the routing layer.request.args), form data (request.form), JSON (request.get_json()), and files (request.files).make_response() objects, jsonify() for APIs, or redirect() for navigation.extends, block) eliminates HTML duplication. Use macros for reusable components and includes for partials.With routes and templates mastered, you have the foundation for building any Flask web application. The next steps are adding a database with Flask-SQLAlchemy, user authentication with Flask-Login, and deploying to production. Each of these builds directly on the routing and templating patterns covered here.