Every non-trivial web application needs persistent storage. Whether you are building a REST API, an admin dashboard, or a SaaS product, the database layer is the backbone that holds your application state, user data, and business logic together. Flask, being a micro-framework, does not ship with a built-in ORM or database abstraction. This is intentional — it lets you choose the right tool for your domain instead of forcing a one-size-fits-all solution.
In practice, the Python ecosystem has converged on SQLAlchemy as the de facto ORM for Flask applications, and Flask-SQLAlchemy as the integration layer that wires SQLAlchemy into the Flask application lifecycle. This tutorial covers the full spectrum of database integration — from defining your first model to managing migrations in production, handling transactions safely, and optimizing query performance under real load.
We will also discuss when an ORM is the wrong choice and you should drop down to raw SQL. The goal is not to make you a SQLAlchemy expert overnight, but to give you the mental model and working patterns you need to build production-grade data layers in Flask.
Before we dive in, it is worth understanding the trade-off:
The pragmatic approach is to use the ORM for 90% of your operations (CRUD, relationships, standard queries) and drop to raw SQL for the remaining 10% (complex reporting, bulk inserts, database-specific features). SQLAlchemy supports both seamlessly.
Flask-SQLAlchemy is a Flask extension that adds SQLAlchemy support with sensible defaults and useful helpers. Install it along with the database driver you need:
# Core package pip install Flask-SQLAlchemy # Database drivers (install the one you need) pip install psycopg2-binary # PostgreSQL pip install PyMySQL # MySQL pip install mysqlclient # MySQL (C extension, faster) # SQLite uses Python's built-in sqlite3 module — no extra install needed
The most important configuration key is SQLALCHEMY_DATABASE_URI, which tells SQLAlchemy how to connect to your database. Here is a minimal setup:
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
def create_app():
app = Flask(__name__)
# SQLite (file-based, good for development)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///app.db'
# Disable modification tracking (saves memory)
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db.init_app(app)
return app
The URI format follows the pattern: dialect+driver://username:password@host:port/database
# SQLite — relative path (three slashes = relative to instance folder) SQLALCHEMY_DATABASE_URI = 'sqlite:///app.db' # SQLite — absolute path (four slashes) SQLALCHEMY_DATABASE_URI = 'sqlite:////var/data/app.db' # PostgreSQL SQLALCHEMY_DATABASE_URI = 'postgresql://user:password@localhost:5432/mydb' # PostgreSQL with psycopg2 driver explicitly SQLALCHEMY_DATABASE_URI = 'postgresql+psycopg2://user:password@localhost:5432/mydb' # MySQL with PyMySQL driver SQLALCHEMY_DATABASE_URI = 'mysql+pymysql://user:password@localhost:3306/mydb' # MySQL with mysqlclient driver SQLALCHEMY_DATABASE_URI = 'mysql+mysqldb://user:password@localhost:3306/mydb' # MySQL with charset specified SQLALCHEMY_DATABASE_URI = 'mysql+pymysql://user:password@localhost/mydb?charset=utf8mb4'
app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql://user:pass@localhost/mydb' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False app.config['SQLALCHEMY_ECHO'] = True # Log all SQL statements (dev only) app.config['SQLALCHEMY_POOL_SIZE'] = 10 # Connection pool size app.config['SQLALCHEMY_POOL_RECYCLE'] = 3600 # Recycle connections after 1 hour app.config['SQLALCHEMY_MAX_OVERFLOW'] = 20 # Extra connections beyond pool_size app.config['SQLALCHEMY_POOL_TIMEOUT'] = 30 # Seconds to wait for a connection
A model is a Python class that maps to a database table. Each class attribute maps to a column. Flask-SQLAlchemy provides db.Model as the base class for all your models.
from datetime import datetime, timezone
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
class User(db.Model):
__tablename__ = 'users' # Explicit table name (optional, defaults to class name lowercase)
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
username = db.Column(db.String(80), unique=True, nullable=False, index=True)
email = db.Column(db.String(120), unique=True, nullable=False)
password_hash = db.Column(db.String(256), nullable=False)
is_active = db.Column(db.Boolean, default=True, nullable=False)
role = db.Column(db.String(20), default='user', nullable=False)
bio = db.Column(db.Text, nullable=True)
created_at = db.Column(db.DateTime, default=lambda: datetime.now(timezone.utc), nullable=False)
updated_at = db.Column(
db.DateTime,
default=lambda: datetime.now(timezone.utc),
onupdate=lambda: datetime.now(timezone.utc),
nullable=False
)
def __repr__(self):
return f'<User {self.username}>'
| SQLAlchemy Type | Python Type | SQL Equivalent |
|---|---|---|
db.Integer |
int | INTEGER |
db.BigInteger |
int | BIGINT |
db.String(n) |
str | VARCHAR(n) |
db.Text |
str | TEXT |
db.Float |
float | FLOAT |
db.Numeric(10, 2) |
Decimal | NUMERIC(10, 2) |
db.Boolean |
bool | BOOLEAN |
db.DateTime |
datetime | DATETIME |
db.Date |
date | DATE |
db.Time |
time | TIME |
db.LargeBinary |
bytes | BLOB |
db.JSON |
dict/list | JSON |
db.Enum |
str/enum | ENUM |
# Primary key
id = db.Column(db.Integer, primary_key=True)
# Unique constraint
email = db.Column(db.String(120), unique=True)
# Not nullable (required field)
name = db.Column(db.String(80), nullable=False)
# Default value (Python-side)
role = db.Column(db.String(20), default='user')
# Server-side default
created_at = db.Column(db.DateTime, server_default=db.func.now())
# Index for faster queries
username = db.Column(db.String(80), index=True)
# Composite index
__table_args__ = (
db.Index('idx_user_email_role', 'email', 'role'),
db.UniqueConstraint('first_name', 'last_name', name='uq_full_name'),
)
Relationships define how models connect to each other. SQLAlchemy supports one-to-many, many-to-one, one-to-one, and many-to-many relationships. Getting these right is critical — bad relationship design leads to N+1 queries and painful refactors later.
The most common relationship. One user has many posts. The foreign key lives on the “many” side.
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
# One user has many posts
posts = db.relationship('Post', back_populates='author', lazy='select')
def __repr__(self):
return f'<User {self.username}>'
class Post(db.Model):
__tablename__ = 'posts'
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(200), nullable=False)
body = db.Column(db.Text, nullable=False)
# Foreign key to users table
author_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
# Many posts belong to one user
author = db.relationship('User', back_populates='posts')
def __repr__(self):
return f'<Post {self.title}>'
Many-to-many relationships require an association table. For example, posts can have many tags, and tags can belong to many posts.
# Association table (no model class needed for simple many-to-many)
post_tags = db.Table('post_tags',
db.Column('post_id', db.Integer, db.ForeignKey('posts.id'), primary_key=True),
db.Column('tag_id', db.Integer, db.ForeignKey('tags.id'), primary_key=True)
)
class Post(db.Model):
__tablename__ = 'posts'
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(200), nullable=False)
# Many-to-many with tags
tags = db.relationship('Tag', secondary=post_tags, back_populates='posts', lazy='select')
class Tag(db.Model):
__tablename__ = 'tags'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(50), unique=True, nullable=False)
# Many-to-many with posts
posts = db.relationship('Post', secondary=post_tags, back_populates='tags', lazy='select')
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
# One-to-one: uselist=False means this returns a single object, not a list
profile = db.relationship('Profile', back_populates='user', uselist=False)
class Profile(db.Model):
__tablename__ = 'profiles'
id = db.Column(db.Integer, primary_key=True)
bio = db.Column(db.Text)
avatar_url = db.Column(db.String(500))
user_id = db.Column(db.Integer, db.ForeignKey('users.id'), unique=True, nullable=False)
user = db.relationship('User', back_populates='profile')
The lazy parameter controls when related objects are loaded from the database:
| Value | Behavior | Use When |
|---|---|---|
'select' (default) |
Loads related objects on first access via a separate SELECT | You access the relationship occasionally |
'joined' |
Loads via JOIN in the same query | You always need the related data |
'subquery' |
Loads via a subquery after the initial query | One-to-many where JOIN would duplicate rows |
'dynamic' |
Returns a query object instead of loading results | Large collections you want to filter further |
'selectin' |
Loads via SELECT … WHERE id IN (…) | Best default for most one-to-many relationships |
Once your models are defined, you need to create the actual database tables. The simplest approach uses db.create_all().
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
def create_app():
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///app.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db.init_app(app)
with app.app_context():
# Import models so SQLAlchemy knows about them
from . import models
db.create_all()
return app
For production applications, use the application factory pattern. This separates the creation of the db object from the app, allowing you to create multiple app instances (for testing, different configs, etc.).
# extensions.py
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
# models.py
from extensions import db
from datetime import datetime, timezone
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
created_at = db.Column(db.DateTime, default=lambda: datetime.now(timezone.utc))
# app.py
from flask import Flask
from extensions import db
def create_app(config_name='development'):
app = Flask(__name__)
# Load config based on environment
if config_name == 'development':
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///dev.db'
app.config['SQLALCHEMY_ECHO'] = True
elif config_name == 'testing':
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///:memory:'
elif config_name == 'production':
app.config['SQLALCHEMY_DATABASE_URI'] = os.environ.get('DATABASE_URL')
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
# Initialize extensions
db.init_app(app)
# Register blueprints, error handlers, etc.
# ...
return app
Important: db.create_all() only creates tables that do not already exist. It will not modify existing tables (add columns, change types, etc.). For schema changes on existing tables, you need migrations — covered in section 8.
CRUD stands for Create, Read, Update, Delete — the four fundamental operations on any data store. Here is how each works with Flask-SQLAlchemy.
# Create a single record
user = User(username='john_doe', email='john@example.com', password_hash='hashed_pw')
db.session.add(user)
db.session.commit()
# The user now has an id assigned by the database
print(user.id) # e.g., 1
# Create multiple records at once
users = [
User(username='alice', email='alice@example.com', password_hash='hash1'),
User(username='bob', email='bob@example.com', password_hash='hash2'),
User(username='charlie', email='charlie@example.com', password_hash='hash3'),
]
db.session.add_all(users)
db.session.commit()
# Get by primary key
user = db.session.get(User, 1) # Returns None if not found
# Get all records
all_users = User.query.all()
# Get first matching record
user = User.query.filter_by(username='john_doe').first()
# Get first or return 404 (useful in route handlers)
user = User.query.filter_by(username='john_doe').first_or_404(
description='User not found'
)
# Get by primary key or 404
user = db.session.get(User, 1) or abort(404)
# Method 1: Modify the object and commit
user = User.query.filter_by(username='john_doe').first()
if user:
user.email = 'newemail@example.com'
user.role = 'admin'
db.session.commit()
# Method 2: Bulk update (more efficient for many records)
User.query.filter(User.role == 'user').update({'is_active': False})
db.session.commit()
# Method 3: Update with returning the count of affected rows
count = User.query.filter(User.last_login < cutoff_date).update(
{'is_active': False},
synchronize_session='fetch'
)
db.session.commit()
print(f'Deactivated {count} users')
# Delete a single record
user = User.query.filter_by(username='john_doe').first()
if user:
db.session.delete(user)
db.session.commit()
# Bulk delete
deleted_count = User.query.filter(User.is_active == False).delete()
db.session.commit()
print(f'Deleted {deleted_count} inactive users')
SQLAlchemy's query interface is expressive and composable. You can chain methods to build complex queries without writing raw SQL.
# filter_by — simple equality checks using keyword arguments
users = User.query.filter_by(role='admin', is_active=True).all()
# filter — more powerful, supports operators
users = User.query.filter(User.age >= 18).all()
users = User.query.filter(User.username.like('%john%')).all()
users = User.query.filter(User.email.endswith('@example.com')).all()
users = User.query.filter(User.role.in_(['admin', 'moderator'])).all()
users = User.query.filter(User.bio.isnot(None)).all()
# Combine multiple filters (AND)
users = User.query.filter(
User.role == 'admin',
User.is_active == True,
User.created_at >= start_date
).all()
# OR conditions
from sqlalchemy import or_
users = User.query.filter(
or_(User.role == 'admin', User.role == 'moderator')
).all()
# NOT conditions
from sqlalchemy import not_
users = User.query.filter(not_(User.is_active)).all()
# Order by
users = User.query.order_by(User.created_at.desc()).all()
users = User.query.order_by(User.last_name.asc(), User.first_name.asc()).all()
# Limit and offset
users = User.query.order_by(User.id).limit(10).offset(20).all()
# First result
user = User.query.order_by(User.created_at.desc()).first()
# Count
active_count = User.query.filter_by(is_active=True).count()
# Pagination (Flask-SQLAlchemy built-in)
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 20, type=int)
pagination = User.query.order_by(User.created_at.desc()).paginate(
page=page,
per_page=per_page,
error_out=False # Return empty page instead of 404
)
# Pagination object properties
items = pagination.items # List of items on current page
total = pagination.total # Total number of items
pages = pagination.pages # Total number of pages
has_next = pagination.has_next
has_prev = pagination.has_prev
next_num = pagination.next_num
prev_num = pagination.prev_num
from sqlalchemy import func
# Count
total = db.session.query(func.count(User.id)).scalar()
# Sum
total_revenue = db.session.query(func.sum(Order.total_amount)).scalar()
# Average
avg_age = db.session.query(func.avg(User.age)).scalar()
# Min / Max
oldest = db.session.query(func.min(User.created_at)).scalar()
newest = db.session.query(func.max(User.created_at)).scalar()
# Group by
role_counts = db.session.query(
User.role,
func.count(User.id).label('count')
).group_by(User.role).all()
for role, count in role_counts:
print(f'{role}: {count}')
# Group by with having
popular_roles = db.session.query(
User.role,
func.count(User.id).label('count')
).group_by(User.role).having(func.count(User.id) > 5).all()
In production, you cannot use db.create_all() to evolve your schema. It does not alter existing tables — it only creates missing ones. Database migrations track every schema change as a versioned script that can be applied (upgrade) or reversed (downgrade). Flask-Migrate wraps Alembic, the migration tool for SQLAlchemy.
pip install Flask-Migrate
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
db = SQLAlchemy()
migrate = Migrate()
def create_app():
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///app.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db.init_app(app)
migrate.init_app(app, db)
return app
# Initialize the migrations directory (run once) flask db init # Generate a migration script after changing models flask db migrate -m "Add users table" # Apply the migration to the database flask db upgrade # Revert the last migration flask db downgrade # Show current migration version flask db current # Show migration history flask db history # Upgrade to a specific version flask db upgrade ae1027a6acf # Downgrade to a specific version flask db downgrade ae1027a6acf
# 1. Make changes to your models (add column, new table, etc.) # 2. Generate migration flask db migrate -m "Add phone_number column to users" # 3. Review the generated migration file in migrations/versions/ # 4. Apply flask db upgrade # 5. Commit migration file to version control git add migrations/ git commit -m "Add phone_number column migration"
Critical rule: Always review auto-generated migration files before applying them. Alembic does its best to detect changes, but it can miss things like column renames (it sees a drop + add instead), data migrations, or index changes. Edit the generated file if needed.
"""Add phone_number column to users
Revision ID: a1b2c3d4e5f6
Revises: 9z8y7x6w5v4u
Create Date: 2026-02-26 10:30:00.000000
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers
revision = 'a1b2c3d4e5f6'
down_revision = '9z8y7x6w5v4u'
branch_labels = None
depends_on = None
def upgrade():
op.add_column('users', sa.Column('phone_number', sa.String(20), nullable=True))
op.create_index('idx_users_phone', 'users', ['phone_number'])
def downgrade():
op.drop_index('idx_users_phone', table_name='users')
op.drop_column('users', 'phone_number')
Seeding populates your database with initial or test data. Flask's CLI makes it easy to create custom commands for this.
import click
from flask.cli import with_appcontext
from extensions import db
from models import User, Product, Category
@click.command('seed-db')
@with_appcontext
def seed_db_command():
"""Seed the database with sample data."""
# Clear existing data
db.session.execute(db.text('DELETE FROM users'))
db.session.execute(db.text('DELETE FROM products'))
db.session.execute(db.text('DELETE FROM categories'))
# Seed categories
categories = [
Category(name='Electronics', description='Gadgets and devices'),
Category(name='Clothing', description='Apparel and accessories'),
Category(name='Books', description='Physical and digital books'),
]
db.session.add_all(categories)
db.session.flush() # Flush to get IDs without committing
# Seed users
users = [
User(username='admin', email='admin@example.com', password_hash='hashed_admin', role='admin'),
User(username='alice', email='alice@example.com', password_hash='hashed_alice'),
User(username='bob', email='bob@example.com', password_hash='hashed_bob'),
]
db.session.add_all(users)
# Seed products
products = [
Product(name='Laptop', price=999.99, category_id=categories[0].id, stock=50),
Product(name='T-Shirt', price=19.99, category_id=categories[1].id, stock=200),
Product(name='Python Cookbook', price=39.99, category_id=categories[2].id, stock=100),
]
db.session.add_all(products)
db.session.commit()
click.echo('Database seeded successfully.')
# Register the command in your app factory
def create_app():
app = Flask(__name__)
# ... config, extensions ...
app.cli.add_command(seed_db_command)
return app
# Run the seed command flask seed-db
from faker import Faker
fake = Faker()
@click.command('seed-fake')
@click.argument('count', default=50)
@with_appcontext
def seed_fake_command(count):
"""Generate fake users for development."""
users = []
for _ in range(count):
users.append(User(
username=fake.unique.user_name(),
email=fake.unique.email(),
password_hash=fake.sha256(),
bio=fake.paragraph(nb_sentences=3),
is_active=fake.boolean(chance_of_getting_true=85),
created_at=fake.date_time_between(start_date='-1y', end_date='now')
))
db.session.add_all(users)
db.session.commit()
click.echo(f'Created {count} fake users.')
Database connections are expensive to create. Connection pooling keeps a set of connections open and reuses them across requests. SQLAlchemy handles this automatically, but you should tune the settings for your workload.
app.config['SQLALCHEMY_ENGINE_OPTIONS'] = {
'pool_size': 10, # Number of permanent connections to keep
'max_overflow': 20, # Extra connections allowed beyond pool_size
'pool_timeout': 30, # Seconds to wait for a connection from the pool
'pool_recycle': 1800, # Recycle connections after 30 minutes
'pool_pre_ping': True, # Test connections before using them (handles stale connections)
}
pool_size connections are in use, SQLAlchemy can create up to max_overflow additional temporary connections. These are closed when returned to the pool. Default is 10.wait_timeout (default 8 hours). Set this lower than your database's idle timeout.SELECT 1 before using a connection. Catches dead connections without the application seeing an error. Small overhead but highly recommended for production.# For a typical web app handling ~50 concurrent requests
app.config['SQLALCHEMY_ENGINE_OPTIONS'] = {
'pool_size': 20,
'max_overflow': 30,
'pool_timeout': 30,
'pool_recycle': 1800,
'pool_pre_ping': True,
}
# For a lightweight app or development
app.config['SQLALCHEMY_ENGINE_OPTIONS'] = {
'pool_size': 5,
'max_overflow': 10,
'pool_recycle': 3600,
'pool_pre_ping': True,
}
# To disable pooling entirely (useful for debugging)
from sqlalchemy.pool import NullPool
app.config['SQLALCHEMY_ENGINE_OPTIONS'] = {
'poolclass': NullPool,
}
A transaction groups multiple database operations into a single atomic unit — either all of them succeed, or none of them do. SQLAlchemy uses transactions implicitly (every operation between commit() calls is a transaction), but understanding explicit transaction control is essential for correctness.
# Implicit transaction — the most common pattern
try:
user = User(username='alice', email='alice@example.com', password_hash='hash')
db.session.add(user)
profile = Profile(user=user, bio='Software engineer')
db.session.add(profile)
db.session.commit() # Both user and profile are saved atomically
except Exception as e:
db.session.rollback() # Undo everything if any operation fails
raise e
from contextlib import contextmanager
@contextmanager
def transaction():
"""Context manager for database transactions."""
try:
yield db.session
db.session.commit()
except Exception:
db.session.rollback()
raise
# Usage
with transaction() as session:
user = User(username='bob', email='bob@example.com', password_hash='hash')
session.add(user)
profile = Profile(user=user, bio='Data scientist')
session.add(profile)
# Commits automatically on exit, rolls back on exception
def place_order(user_id, items):
"""Place an order with savepoints for partial rollback."""
try:
order = Order(user_id=user_id, status='pending')
db.session.add(order)
db.session.flush() # Get the order ID
for item in items:
# Savepoint for each item — if one fails, we can skip it
savepoint = db.session.begin_nested()
try:
product = db.session.get(Product, item['product_id'])
if product.stock < item['quantity']:
raise ValueError(f'Insufficient stock for {product.name}')
product.stock -= item['quantity']
order_item = OrderItem(
order_id=order.id,
product_id=product.id,
quantity=item['quantity'],
unit_price=product.price
)
db.session.add(order_item)
savepoint.commit()
except Exception as e:
savepoint.rollback()
print(f'Skipping item: {e}')
order.total_amount = sum(
oi.quantity * oi.unit_price for oi in order.items
)
db.session.commit()
return order
except Exception as e:
db.session.rollback()
raise e
from sqlalchemy.exc import IntegrityError, OperationalError
def create_user(username, email, password_hash):
"""Create a user with proper error handling."""
try:
user = User(
username=username,
email=email,
password_hash=password_hash
)
db.session.add(user)
db.session.commit()
return user, None
except IntegrityError as e:
db.session.rollback()
if 'username' in str(e.orig):
return None, 'Username already exists'
if 'email' in str(e.orig):
return None, 'Email already exists'
return None, 'Duplicate entry'
except OperationalError as e:
db.session.rollback()
return None, 'Database connection error'
except Exception as e:
db.session.rollback()
return None, f'Unexpected error: {str(e)}'
Let us build a complete e-commerce data layer that ties together everything covered so far. This example includes four interconnected models, full CRUD operations, complex queries, and transactional order placement.
from datetime import datetime, timezone
from extensions import db
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False, index=True)
email = db.Column(db.String(120), unique=True, nullable=False)
password_hash = db.Column(db.String(256), nullable=False)
is_active = db.Column(db.Boolean, default=True, nullable=False)
created_at = db.Column(db.DateTime, default=lambda: datetime.now(timezone.utc))
# Relationships
orders = db.relationship('Order', back_populates='user', lazy='selectin')
def __repr__(self):
return f'<User {self.username}>'
def to_dict(self):
return {
'id': self.id,
'username': self.username,
'email': self.email,
'is_active': self.is_active,
'created_at': self.created_at.isoformat(),
'order_count': len(self.orders)
}
class Product(db.Model):
__tablename__ = 'products'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(200), nullable=False, index=True)
description = db.Column(db.Text)
price = db.Column(db.Numeric(10, 2), nullable=False)
stock = db.Column(db.Integer, default=0, nullable=False)
category = db.Column(db.String(50), nullable=False, index=True)
is_available = db.Column(db.Boolean, default=True, nullable=False)
created_at = db.Column(db.DateTime, default=lambda: datetime.now(timezone.utc))
# Relationships
order_items = db.relationship('OrderItem', back_populates='product', lazy='select')
__table_args__ = (
db.Index('idx_product_category_price', 'category', 'price'),
db.CheckConstraint('price > 0', name='ck_positive_price'),
db.CheckConstraint('stock >= 0', name='ck_non_negative_stock'),
)
def __repr__(self):
return f'<Product {self.name} ${self.price}>'
def to_dict(self):
return {
'id': self.id,
'name': self.name,
'description': self.description,
'price': float(self.price),
'stock': self.stock,
'category': self.category,
'is_available': self.is_available
}
class Order(db.Model):
__tablename__ = 'orders'
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False, index=True)
status = db.Column(db.String(20), default='pending', nullable=False)
total_amount = db.Column(db.Numeric(12, 2), default=0)
shipping_address = db.Column(db.Text)
created_at = db.Column(db.DateTime, default=lambda: datetime.now(timezone.utc))
updated_at = db.Column(
db.DateTime,
default=lambda: datetime.now(timezone.utc),
onupdate=lambda: datetime.now(timezone.utc)
)
# Relationships
user = db.relationship('User', back_populates='orders')
items = db.relationship('OrderItem', back_populates='order', lazy='selectin',
cascade='all, delete-orphan')
__table_args__ = (
db.Index('idx_order_user_status', 'user_id', 'status'),
)
def __repr__(self):
return f'<Order #{self.id} - {self.status}>'
def to_dict(self):
return {
'id': self.id,
'user_id': self.user_id,
'status': self.status,
'total_amount': float(self.total_amount),
'items': [item.to_dict() for item in self.items],
'created_at': self.created_at.isoformat()
}
class OrderItem(db.Model):
__tablename__ = 'order_items'
id = db.Column(db.Integer, primary_key=True)
order_id = db.Column(db.Integer, db.ForeignKey('orders.id'), nullable=False)
product_id = db.Column(db.Integer, db.ForeignKey('products.id'), nullable=False)
quantity = db.Column(db.Integer, nullable=False)
unit_price = db.Column(db.Numeric(10, 2), nullable=False)
# Relationships
order = db.relationship('Order', back_populates='items')
product = db.relationship('Product', back_populates='order_items')
__table_args__ = (
db.CheckConstraint('quantity > 0', name='ck_positive_quantity'),
)
@property
def subtotal(self):
return float(self.quantity * self.unit_price)
def __repr__(self):
return f'<OrderItem {self.product.name} x{self.quantity}>'
def to_dict(self):
return {
'id': self.id,
'product_id': self.product_id,
'product_name': self.product.name,
'quantity': self.quantity,
'unit_price': float(self.unit_price),
'subtotal': self.subtotal
}
# ---- USER CRUD ----
def create_user(username, email, password_hash):
"""Create a new user."""
user = User(username=username, email=email, password_hash=password_hash)
db.session.add(user)
db.session.commit()
return user
def get_user(user_id):
"""Get a user by ID."""
return db.session.get(User, user_id)
def update_user(user_id, **kwargs):
"""Update user fields."""
user = db.session.get(User, user_id)
if not user:
return None
for key, value in kwargs.items():
if hasattr(user, key):
setattr(user, key, value)
db.session.commit()
return user
def delete_user(user_id):
"""Soft-delete a user by deactivating."""
user = db.session.get(User, user_id)
if user:
user.is_active = False
db.session.commit()
return user
# ---- PRODUCT CRUD ----
def create_product(name, price, category, stock=0, description=None):
"""Create a new product."""
product = Product(
name=name, price=price, category=category,
stock=stock, description=description
)
db.session.add(product)
db.session.commit()
return product
def get_products_by_category(category, min_price=None, max_price=None):
"""Get products filtered by category and optional price range."""
query = Product.query.filter_by(category=category, is_available=True)
if min_price is not None:
query = query.filter(Product.price >= min_price)
if max_price is not None:
query = query.filter(Product.price <= max_price)
return query.order_by(Product.price.asc()).all()
def update_stock(product_id, quantity_change):
"""Adjust product stock. Use negative values for decrements."""
product = db.session.get(Product, product_id)
if product:
product.stock += quantity_change
if product.stock <= 0:
product.is_available = False
db.session.commit()
return product
from sqlalchemy import func, desc
def get_top_customers(limit=10):
"""Get customers with the highest total spend."""
results = db.session.query(
User.username,
User.email,
func.count(Order.id).label('order_count'),
func.sum(Order.total_amount).label('total_spent')
).join(Order, User.id == Order.user_id)\
.filter(Order.status == 'completed')\
.group_by(User.id)\
.order_by(desc('total_spent'))\
.limit(limit)\
.all()
return [
{
'username': r.username,
'email': r.email,
'order_count': r.order_count,
'total_spent': float(r.total_spent)
}
for r in results
]
def get_revenue_by_category():
"""Get total revenue grouped by product category."""
results = db.session.query(
Product.category,
func.sum(OrderItem.quantity * OrderItem.unit_price).label('revenue'),
func.sum(OrderItem.quantity).label('units_sold')
).join(OrderItem, Product.id == OrderItem.product_id)\
.join(Order, OrderItem.order_id == Order.id)\
.filter(Order.status == 'completed')\
.group_by(Product.category)\
.order_by(desc('revenue'))\
.all()
return [
{
'category': r.category,
'revenue': float(r.revenue),
'units_sold': r.units_sold
}
for r in results
]
def get_user_order_history(user_id, page=1, per_page=10):
"""Get paginated order history for a user with item details."""
return Order.query\
.filter_by(user_id=user_id)\
.order_by(Order.created_at.desc())\
.paginate(page=page, per_page=per_page, error_out=False)
def search_products(query_text, category=None, in_stock_only=True):
"""Full-text product search with filters."""
q = Product.query.filter(
Product.name.ilike(f'%{query_text}%')
)
if category:
q = q.filter_by(category=category)
if in_stock_only:
q = q.filter(Product.stock > 0, Product.is_available == True)
return q.order_by(Product.name).all()
from decimal import Decimal
from sqlalchemy.exc import IntegrityError
def place_order(user_id, cart_items, shipping_address):
"""
Place an order atomically.
Args:
user_id: ID of the user placing the order
cart_items: List of dicts with 'product_id' and 'quantity'
shipping_address: Shipping address string
Returns:
(Order, None) on success, (None, error_message) on failure
"""
try:
# Verify user exists and is active
user = db.session.get(User, user_id)
if not user or not user.is_active:
return None, 'Invalid or inactive user'
# Create the order
order = Order(
user_id=user_id,
status='pending',
shipping_address=shipping_address
)
db.session.add(order)
db.session.flush() # Get order.id without committing
total = Decimal('0.00')
for item in cart_items:
# Lock the product row to prevent race conditions
product = db.session.query(Product).filter_by(
id=item['product_id']
).with_for_update().first()
if not product:
db.session.rollback()
return None, f'Product {item["product_id"]} not found'
if not product.is_available:
db.session.rollback()
return None, f'{product.name} is no longer available'
if product.stock < item['quantity']:
db.session.rollback()
return None, f'Insufficient stock for {product.name} (available: {product.stock})'
# Deduct stock
product.stock -= item['quantity']
if product.stock == 0:
product.is_available = False
# Create order item
order_item = OrderItem(
order_id=order.id,
product_id=product.id,
quantity=item['quantity'],
unit_price=product.price
)
db.session.add(order_item)
total += product.price * item['quantity']
order.total_amount = total
db.session.commit()
return order, None
except IntegrityError as e:
db.session.rollback()
return None, f'Data integrity error: {str(e.orig)}'
except Exception as e:
db.session.rollback()
return None, f'Order failed: {str(e)}'
@app.route('/api/orders', methods=['POST'])
def create_order():
data = request.get_json()
order, error = place_order(
user_id=data['user_id'],
cart_items=data['items'], # [{'product_id': 1, 'quantity': 2}, ...]
shipping_address=data['shipping_address']
)
if error:
return jsonify({'error': error}), 400
return jsonify(order.to_dict()), 201
Sometimes the ORM gets in the way. Complex reporting queries, database-specific features (window functions, CTEs, recursive queries), or bulk operations are often cleaner and faster as raw SQL. SQLAlchemy makes this straightforward.
from sqlalchemy import text
# Simple query
result = db.session.execute(text('SELECT * FROM users WHERE is_active = :active'), {'active': True})
users = result.fetchall()
for user in users:
print(user.username, user.email) # Access by column name
# Insert
db.session.execute(
text('INSERT INTO users (username, email, password_hash) VALUES (:username, :email, :password)'),
{'username': 'dave', 'email': 'dave@example.com', 'password': 'hashed_pw'}
)
db.session.commit()
# Update
db.session.execute(
text('UPDATE products SET price = price * :multiplier WHERE category = :category'),
{'multiplier': 1.10, 'category': 'Electronics'}
)
db.session.commit()
# Delete
db.session.execute(
text('DELETE FROM sessions WHERE last_active < :cutoff'),
{'cutoff': datetime(2026, 1, 1)}
)
db.session.commit()
def get_monthly_revenue_report(year):
"""Get monthly revenue breakdown with running totals."""
sql = text("""
SELECT
EXTRACT(MONTH FROM o.created_at) AS month,
COUNT(DISTINCT o.id) AS order_count,
COUNT(DISTINCT o.user_id) AS unique_customers,
SUM(o.total_amount) AS monthly_revenue,
SUM(SUM(o.total_amount)) OVER (ORDER BY EXTRACT(MONTH FROM o.created_at)) AS running_total
FROM orders o
WHERE EXTRACT(YEAR FROM o.created_at) = :year
AND o.status = 'completed'
GROUP BY EXTRACT(MONTH FROM o.created_at)
ORDER BY month
""")
result = db.session.execute(sql, {'year': year})
return [
{
'month': int(row.month),
'order_count': row.order_count,
'unique_customers': row.unique_customers,
'revenue': float(row.monthly_revenue),
'running_total': float(row.running_total)
}
for row in result
]
# Bulk insert — much faster than ORM for large datasets
def bulk_import_products(products_data):
"""Import thousands of products efficiently."""
sql = text("""
INSERT INTO products (name, price, category, stock, is_available)
VALUES (:name, :price, :category, :stock, :is_available)
""")
# Execute with a list of parameter dicts
db.session.execute(sql, products_data)
db.session.commit()
# Usage
products = [
{'name': f'Product {i}', 'price': 9.99, 'category': 'Bulk', 'stock': 100, 'is_available': True}
for i in range(10000)
]
bulk_import_products(products)
Security note: Always use parameterized queries with :param_name placeholders. Never use f-strings or string concatenation to build SQL — that is how SQL injection happens.
Database performance problems are the most common cause of slow web applications. Here are the patterns and techniques that matter most in Flask-SQLAlchemy.
The N+1 problem occurs when you load a list of N objects and then access a relationship on each, causing N additional queries.
# BAD: N+1 queries — 1 query for orders + N queries for user on each order
orders = Order.query.all()
for order in orders:
print(order.user.username) # Each access triggers a separate SELECT
# GOOD: Eager loading with joinedload — 1 query total
from sqlalchemy.orm import joinedload
orders = Order.query.options(joinedload(Order.user)).all()
for order in orders:
print(order.user.username) # Already loaded, no extra query
# GOOD: Eager loading with selectinload — 2 queries total (better for one-to-many)
from sqlalchemy.orm import selectinload
users = User.query.options(selectinload(User.orders)).all()
for user in users:
print(f'{user.username}: {len(user.orders)} orders') # Already loaded
# Nested eager loading
orders = Order.query.options(
joinedload(Order.user),
selectinload(Order.items).joinedload(OrderItem.product)
).all()
# Load only specific columns usernames = db.session.query(User.username, User.email).filter_by(is_active=True).all() # Defer heavy columns (load them on access) from sqlalchemy.orm import defer users = User.query.options(defer(User.bio), defer(User.password_hash)).all() # Undefer when you need them users = User.query.options(defer(User.bio)).all() # Later, accessing user.bio will trigger a lazy load for that specific column
# Index columns you filter, sort, or join on frequently
class Product(db.Model):
__tablename__ = 'products'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(200), index=True) # Searched frequently
category = db.Column(db.String(50), index=True) # Filtered frequently
price = db.Column(db.Numeric(10, 2))
created_at = db.Column(db.DateTime, index=True) # Sorted frequently
# Composite index for queries that filter on both
__table_args__ = (
db.Index('idx_category_price', 'category', 'price'),
db.Index('idx_category_created', 'category', 'created_at'),
)
# Enable SQL logging in development
app.config['SQLALCHEMY_ECHO'] = True
# Or use events for more control
from sqlalchemy import event
import time
@event.listens_for(db.engine, 'before_cursor_execute')
def before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
conn.info['query_start_time'] = time.time()
@event.listens_for(db.engine, 'after_cursor_execute')
def after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
total = time.time() - conn.info['query_start_time']
if total > 0.5: # Log slow queries (over 500ms)
app.logger.warning(f'Slow query ({total:.2f}s): {statement}')
These are the mistakes that cost real hours in debugging. Know them, avoid them.
# BUG: Changes are never persisted user = User(username='alice', email='alice@example.com', password_hash='hash') db.session.add(user) # Missing: db.session.commit() # The user exists in the session but NOT in the database # FIX: Always commit after making changes db.session.add(user) db.session.commit()
# BUG: Accessing attributes after the session is closed
def get_user_data():
user = User.query.first()
return user
# Later, outside the request context:
user = get_user_data()
print(user.orders) # DetachedInstanceError! Session is gone.
# FIX 1: Eager load what you need
def get_user_data():
return User.query.options(selectinload(User.orders)).first()
# FIX 2: Convert to dict while session is active
def get_user_data():
user = User.query.first()
return user.to_dict() # Serialize within the session context
# FIX 3: Keep the object attached by using it within the request
@app.route('/users/<int:id>')
def get_user(id):
user = User.query.get_or_404(id)
return jsonify(user.to_dict()) # Serialized within request context
# BUG: Template triggers N+1 queries
@app.route('/orders')
def list_orders():
orders = Order.query.all()
return render_template('orders.html', orders=orders)
# Template: {% for order in orders %} {{ order.user.username }} {% endfor %}
# This fires a SELECT for each order's user!
# FIX: Eager load in the view
@app.route('/orders')
def list_orders():
orders = Order.query.options(joinedload(Order.user)).all()
return render_template('orders.html', orders=orders)
# BUG: Failed operation poisons the session for subsequent requests
try:
db.session.add(user)
db.session.commit()
except IntegrityError:
pass # Session is now in a broken state!
# FIX: Always rollback on error
try:
db.session.add(user)
db.session.commit()
except IntegrityError:
db.session.rollback()
# Now the session is clean for the next operation
# BAD: This will NOT update existing tables # If you add a column to a model, create_all() ignores it db.create_all() # Only creates tables that don't exist # GOOD: Use migrations for all schema changes # flask db migrate -m "Add new column" # flask db upgrade
Set up Flask-Migrate from day one, even for small projects. db.create_all() is only acceptable for throwaway prototypes and test fixtures. Every schema change should be a migration file committed to version control.
from sqlalchemy import validates
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
age = db.Column(db.Integer)
@validates('email')
def validate_email(self, key, email):
if '@' not in email:
raise ValueError('Invalid email address')
return email.lower().strip()
@validates('username')
def validate_username(self, key, username):
if len(username) < 3:
raise ValueError('Username must be at least 3 characters')
if not username.isalnum():
raise ValueError('Username must be alphanumeric')
return username.lower().strip()
@validates('age')
def validate_age(self, key, age):
if age is not None and (age < 0 or age > 150):
raise ValueError('Age must be between 0 and 150')
return age
Encapsulate database access in repository classes to keep your route handlers clean and your data access testable.
class UserRepository:
"""Encapsulates all database operations for User."""
@staticmethod
def create(username, email, password_hash):
user = User(username=username, email=email, password_hash=password_hash)
db.session.add(user)
db.session.commit()
return user
@staticmethod
def get_by_id(user_id):
return db.session.get(User, user_id)
@staticmethod
def get_by_username(username):
return User.query.filter_by(username=username).first()
@staticmethod
def get_active_users(page=1, per_page=20):
return User.query.filter_by(is_active=True)\
.order_by(User.created_at.desc())\
.paginate(page=page, per_page=per_page, error_out=False)
@staticmethod
def update(user_id, **kwargs):
user = db.session.get(User, user_id)
if not user:
return None
for key, value in kwargs.items():
if hasattr(user, key):
setattr(user, key, value)
db.session.commit()
return user
@staticmethod
def deactivate(user_id):
user = db.session.get(User, user_id)
if user:
user.is_active = False
db.session.commit()
return user
# Usage in routes — clean and testable
@app.route('/api/users', methods=['POST'])
def create_user():
data = request.get_json()
try:
user = UserRepository.create(
username=data['username'],
email=data['email'],
password_hash=generate_password_hash(data['password'])
)
return jsonify(user.to_dict()), 201
except IntegrityError:
db.session.rollback()
return jsonify({'error': 'Username or email already exists'}), 409
# Ensure sessions are cleaned up after each request
@app.teardown_appcontext
def shutdown_session(exception=None):
db.session.remove()
# Use pool_pre_ping to handle stale connections
app.config['SQLALCHEMY_ENGINE_OPTIONS'] = {
'pool_pre_ping': True,
'pool_recycle': 1800,
}
import os
class Config:
SQLALCHEMY_TRACK_MODIFICATIONS = False
SQLALCHEMY_ENGINE_OPTIONS = {
'pool_pre_ping': True,
}
class DevelopmentConfig(Config):
SQLALCHEMY_DATABASE_URI = 'sqlite:///dev.db'
SQLALCHEMY_ECHO = True
class TestingConfig(Config):
SQLALCHEMY_DATABASE_URI = 'sqlite:///:memory:'
TESTING = True
class ProductionConfig(Config):
SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL')
SQLALCHEMY_ENGINE_OPTIONS = {
'pool_size': 20,
'max_overflow': 30,
'pool_recycle': 1800,
'pool_pre_ping': True,
}
config = {
'development': DevelopmentConfig,
'testing': TestingConfig,
'production': ProductionConfig,
}
db.Model. Use column types, constraints, and indexes to enforce data integrity at the database level.db.relationship() with back_populates for bidirectional access. Choose the right lazy loading strategy to avoid N+1 queries.commit() calls. Always rollback() on error. Use begin_nested() for savepoints when you need partial rollback.pool_size, pool_recycle, and enable pool_pre_ping for production stability.joinedload, selectinload) is your primary weapon against N+1 performance problems. Profile your queries in development with SQLALCHEMY_ECHO = True.text() with parameterized queries — never string concatenation.@validates decorators and database constraints (CheckConstraint, unique, nullable).