Flask – Database Integration

Introduction

Every non-trivial web application needs persistent storage. Whether you are building a REST API, an admin dashboard, or a SaaS product, the database layer is the backbone that holds your application state, user data, and business logic together. Flask, being a micro-framework, does not ship with a built-in ORM or database abstraction. This is intentional — it lets you choose the right tool for your domain instead of forcing a one-size-fits-all solution.

In practice, the Python ecosystem has converged on SQLAlchemy as the de facto ORM for Flask applications, and Flask-SQLAlchemy as the integration layer that wires SQLAlchemy into the Flask application lifecycle. This tutorial covers the full spectrum of database integration — from defining your first model to managing migrations in production, handling transactions safely, and optimizing query performance under real load.

We will also discuss when an ORM is the wrong choice and you should drop down to raw SQL. The goal is not to make you a SQLAlchemy expert overnight, but to give you the mental model and working patterns you need to build production-grade data layers in Flask.

ORM vs Raw SQL

Before we dive in, it is worth understanding the trade-off:

  • ORM (Object-Relational Mapping) — Maps database tables to Python classes. You work with objects instead of writing SQL strings. Benefits include portability across database backends, type safety, relationship management, and reduced boilerplate. The cost is an abstraction layer that can generate inefficient queries if you do not understand what it is doing under the hood.
  • Raw SQL — You write the queries yourself. Maximum control, maximum performance for complex analytics or bulk operations. The cost is more code, no portability, and manual result-to-object mapping.

The pragmatic approach is to use the ORM for 90% of your operations (CRUD, relationships, standard queries) and drop to raw SQL for the remaining 10% (complex reporting, bulk inserts, database-specific features). SQLAlchemy supports both seamlessly.


1. Flask-SQLAlchemy Setup

Flask-SQLAlchemy is a Flask extension that adds SQLAlchemy support with sensible defaults and useful helpers. Install it along with the database driver you need:

# Core package
pip install Flask-SQLAlchemy

# Database drivers (install the one you need)
pip install psycopg2-binary   # PostgreSQL
pip install PyMySQL            # MySQL
pip install mysqlclient        # MySQL (C extension, faster)
# SQLite uses Python's built-in sqlite3 module — no extra install needed

Configuration

The most important configuration key is SQLALCHEMY_DATABASE_URI, which tells SQLAlchemy how to connect to your database. Here is a minimal setup:

from flask import Flask
from flask_sqlalchemy import SQLAlchemy

db = SQLAlchemy()

def create_app():
    app = Flask(__name__)

    # SQLite (file-based, good for development)
    app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///app.db'

    # Disable modification tracking (saves memory)
    app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False

    db.init_app(app)
    return app

Database URI Formats

The URI format follows the pattern: dialect+driver://username:password@host:port/database

# SQLite — relative path (three slashes = relative to instance folder)
SQLALCHEMY_DATABASE_URI = 'sqlite:///app.db'

# SQLite — absolute path (four slashes)
SQLALCHEMY_DATABASE_URI = 'sqlite:////var/data/app.db'

# PostgreSQL
SQLALCHEMY_DATABASE_URI = 'postgresql://user:password@localhost:5432/mydb'

# PostgreSQL with psycopg2 driver explicitly
SQLALCHEMY_DATABASE_URI = 'postgresql+psycopg2://user:password@localhost:5432/mydb'

# MySQL with PyMySQL driver
SQLALCHEMY_DATABASE_URI = 'mysql+pymysql://user:password@localhost:3306/mydb'

# MySQL with mysqlclient driver
SQLALCHEMY_DATABASE_URI = 'mysql+mysqldb://user:password@localhost:3306/mydb'

# MySQL with charset specified
SQLALCHEMY_DATABASE_URI = 'mysql+pymysql://user:password@localhost/mydb?charset=utf8mb4'

Additional Configuration Options

app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql://user:pass@localhost/mydb'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
app.config['SQLALCHEMY_ECHO'] = True          # Log all SQL statements (dev only)
app.config['SQLALCHEMY_POOL_SIZE'] = 10        # Connection pool size
app.config['SQLALCHEMY_POOL_RECYCLE'] = 3600   # Recycle connections after 1 hour
app.config['SQLALCHEMY_MAX_OVERFLOW'] = 20     # Extra connections beyond pool_size
app.config['SQLALCHEMY_POOL_TIMEOUT'] = 30     # Seconds to wait for a connection

2. Defining Models

A model is a Python class that maps to a database table. Each class attribute maps to a column. Flask-SQLAlchemy provides db.Model as the base class for all your models.

from datetime import datetime, timezone
from flask_sqlalchemy import SQLAlchemy

db = SQLAlchemy()

class User(db.Model):
    __tablename__ = 'users'  # Explicit table name (optional, defaults to class name lowercase)

    id = db.Column(db.Integer, primary_key=True, autoincrement=True)
    username = db.Column(db.String(80), unique=True, nullable=False, index=True)
    email = db.Column(db.String(120), unique=True, nullable=False)
    password_hash = db.Column(db.String(256), nullable=False)
    is_active = db.Column(db.Boolean, default=True, nullable=False)
    role = db.Column(db.String(20), default='user', nullable=False)
    bio = db.Column(db.Text, nullable=True)
    created_at = db.Column(db.DateTime, default=lambda: datetime.now(timezone.utc), nullable=False)
    updated_at = db.Column(
        db.DateTime,
        default=lambda: datetime.now(timezone.utc),
        onupdate=lambda: datetime.now(timezone.utc),
        nullable=False
    )

    def __repr__(self):
        return f'<User {self.username}>'

Common Column Types

SQLAlchemy Type Python Type SQL Equivalent
db.Integer int INTEGER
db.BigInteger int BIGINT
db.String(n) str VARCHAR(n)
db.Text str TEXT
db.Float float FLOAT
db.Numeric(10, 2) Decimal NUMERIC(10, 2)
db.Boolean bool BOOLEAN
db.DateTime datetime DATETIME
db.Date date DATE
db.Time time TIME
db.LargeBinary bytes BLOB
db.JSON dict/list JSON
db.Enum str/enum ENUM

Column Options

# Primary key
id = db.Column(db.Integer, primary_key=True)

# Unique constraint
email = db.Column(db.String(120), unique=True)

# Not nullable (required field)
name = db.Column(db.String(80), nullable=False)

# Default value (Python-side)
role = db.Column(db.String(20), default='user')

# Server-side default
created_at = db.Column(db.DateTime, server_default=db.func.now())

# Index for faster queries
username = db.Column(db.String(80), index=True)

# Composite index
__table_args__ = (
    db.Index('idx_user_email_role', 'email', 'role'),
    db.UniqueConstraint('first_name', 'last_name', name='uq_full_name'),
)

3. Relationships

Relationships define how models connect to each other. SQLAlchemy supports one-to-many, many-to-one, one-to-one, and many-to-many relationships. Getting these right is critical — bad relationship design leads to N+1 queries and painful refactors later.

One-to-Many

The most common relationship. One user has many posts. The foreign key lives on the “many” side.

class User(db.Model):
    __tablename__ = 'users'

    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(80), unique=True, nullable=False)

    # One user has many posts
    posts = db.relationship('Post', back_populates='author', lazy='select')

    def __repr__(self):
        return f'<User {self.username}>'


class Post(db.Model):
    __tablename__ = 'posts'

    id = db.Column(db.Integer, primary_key=True)
    title = db.Column(db.String(200), nullable=False)
    body = db.Column(db.Text, nullable=False)

    # Foreign key to users table
    author_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)

    # Many posts belong to one user
    author = db.relationship('User', back_populates='posts')

    def __repr__(self):
        return f'<Post {self.title}>'

Many-to-Many

Many-to-many relationships require an association table. For example, posts can have many tags, and tags can belong to many posts.

# Association table (no model class needed for simple many-to-many)
post_tags = db.Table('post_tags',
    db.Column('post_id', db.Integer, db.ForeignKey('posts.id'), primary_key=True),
    db.Column('tag_id', db.Integer, db.ForeignKey('tags.id'), primary_key=True)
)


class Post(db.Model):
    __tablename__ = 'posts'

    id = db.Column(db.Integer, primary_key=True)
    title = db.Column(db.String(200), nullable=False)

    # Many-to-many with tags
    tags = db.relationship('Tag', secondary=post_tags, back_populates='posts', lazy='select')


class Tag(db.Model):
    __tablename__ = 'tags'

    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(50), unique=True, nullable=False)

    # Many-to-many with posts
    posts = db.relationship('Post', secondary=post_tags, back_populates='tags', lazy='select')

One-to-One

class User(db.Model):
    __tablename__ = 'users'

    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(80), unique=True, nullable=False)

    # One-to-one: uselist=False means this returns a single object, not a list
    profile = db.relationship('Profile', back_populates='user', uselist=False)


class Profile(db.Model):
    __tablename__ = 'profiles'

    id = db.Column(db.Integer, primary_key=True)
    bio = db.Column(db.Text)
    avatar_url = db.Column(db.String(500))

    user_id = db.Column(db.Integer, db.ForeignKey('users.id'), unique=True, nullable=False)
    user = db.relationship('User', back_populates='profile')

Lazy Loading Options

The lazy parameter controls when related objects are loaded from the database:

Value Behavior Use When
'select' (default) Loads related objects on first access via a separate SELECT You access the relationship occasionally
'joined' Loads via JOIN in the same query You always need the related data
'subquery' Loads via a subquery after the initial query One-to-many where JOIN would duplicate rows
'dynamic' Returns a query object instead of loading results Large collections you want to filter further
'selectin' Loads via SELECT … WHERE id IN (…) Best default for most one-to-many relationships

4. Creating the Database

Once your models are defined, you need to create the actual database tables. The simplest approach uses db.create_all().

Simple Approach

from flask import Flask
from flask_sqlalchemy import SQLAlchemy

db = SQLAlchemy()

def create_app():
    app = Flask(__name__)
    app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///app.db'
    app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False

    db.init_app(app)

    with app.app_context():
        # Import models so SQLAlchemy knows about them
        from . import models
        db.create_all()

    return app

Application Factory Pattern (Recommended)

For production applications, use the application factory pattern. This separates the creation of the db object from the app, allowing you to create multiple app instances (for testing, different configs, etc.).

# extensions.py
from flask_sqlalchemy import SQLAlchemy

db = SQLAlchemy()


# models.py
from extensions import db
from datetime import datetime, timezone

class User(db.Model):
    __tablename__ = 'users'
    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(80), unique=True, nullable=False)
    email = db.Column(db.String(120), unique=True, nullable=False)
    created_at = db.Column(db.DateTime, default=lambda: datetime.now(timezone.utc))


# app.py
from flask import Flask
from extensions import db

def create_app(config_name='development'):
    app = Flask(__name__)

    # Load config based on environment
    if config_name == 'development':
        app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///dev.db'
        app.config['SQLALCHEMY_ECHO'] = True
    elif config_name == 'testing':
        app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///:memory:'
    elif config_name == 'production':
        app.config['SQLALCHEMY_DATABASE_URI'] = os.environ.get('DATABASE_URL')

    app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False

    # Initialize extensions
    db.init_app(app)

    # Register blueprints, error handlers, etc.
    # ...

    return app

Important: db.create_all() only creates tables that do not already exist. It will not modify existing tables (add columns, change types, etc.). For schema changes on existing tables, you need migrations — covered in section 8.


5. CRUD Operations

CRUD stands for Create, Read, Update, Delete — the four fundamental operations on any data store. Here is how each works with Flask-SQLAlchemy.

Create

# Create a single record
user = User(username='john_doe', email='john@example.com', password_hash='hashed_pw')
db.session.add(user)
db.session.commit()

# The user now has an id assigned by the database
print(user.id)  # e.g., 1

# Create multiple records at once
users = [
    User(username='alice', email='alice@example.com', password_hash='hash1'),
    User(username='bob', email='bob@example.com', password_hash='hash2'),
    User(username='charlie', email='charlie@example.com', password_hash='hash3'),
]
db.session.add_all(users)
db.session.commit()

Read

# Get by primary key
user = db.session.get(User, 1)  # Returns None if not found

# Get all records
all_users = User.query.all()

# Get first matching record
user = User.query.filter_by(username='john_doe').first()

# Get first or return 404 (useful in route handlers)
user = User.query.filter_by(username='john_doe').first_or_404(
    description='User not found'
)

# Get by primary key or 404
user = db.session.get(User, 1) or abort(404)

Update

# Method 1: Modify the object and commit
user = User.query.filter_by(username='john_doe').first()
if user:
    user.email = 'newemail@example.com'
    user.role = 'admin'
    db.session.commit()

# Method 2: Bulk update (more efficient for many records)
User.query.filter(User.role == 'user').update({'is_active': False})
db.session.commit()

# Method 3: Update with returning the count of affected rows
count = User.query.filter(User.last_login < cutoff_date).update(
    {'is_active': False},
    synchronize_session='fetch'
)
db.session.commit()
print(f'Deactivated {count} users')

Delete

# Delete a single record
user = User.query.filter_by(username='john_doe').first()
if user:
    db.session.delete(user)
    db.session.commit()

# Bulk delete
deleted_count = User.query.filter(User.is_active == False).delete()
db.session.commit()
print(f'Deleted {deleted_count} inactive users')

6. Querying

SQLAlchemy's query interface is expressive and composable. You can chain methods to build complex queries without writing raw SQL.

Basic Query Methods

# filter_by — simple equality checks using keyword arguments
users = User.query.filter_by(role='admin', is_active=True).all()

# filter — more powerful, supports operators
users = User.query.filter(User.age >= 18).all()
users = User.query.filter(User.username.like('%john%')).all()
users = User.query.filter(User.email.endswith('@example.com')).all()
users = User.query.filter(User.role.in_(['admin', 'moderator'])).all()
users = User.query.filter(User.bio.isnot(None)).all()

# Combine multiple filters (AND)
users = User.query.filter(
    User.role == 'admin',
    User.is_active == True,
    User.created_at >= start_date
).all()

# OR conditions
from sqlalchemy import or_
users = User.query.filter(
    or_(User.role == 'admin', User.role == 'moderator')
).all()

# NOT conditions
from sqlalchemy import not_
users = User.query.filter(not_(User.is_active)).all()

Ordering, Limiting, and Pagination

# Order by
users = User.query.order_by(User.created_at.desc()).all()
users = User.query.order_by(User.last_name.asc(), User.first_name.asc()).all()

# Limit and offset
users = User.query.order_by(User.id).limit(10).offset(20).all()

# First result
user = User.query.order_by(User.created_at.desc()).first()

# Count
active_count = User.query.filter_by(is_active=True).count()

# Pagination (Flask-SQLAlchemy built-in)
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 20, type=int)

pagination = User.query.order_by(User.created_at.desc()).paginate(
    page=page,
    per_page=per_page,
    error_out=False  # Return empty page instead of 404
)

# Pagination object properties
items = pagination.items       # List of items on current page
total = pagination.total       # Total number of items
pages = pagination.pages       # Total number of pages
has_next = pagination.has_next
has_prev = pagination.has_prev
next_num = pagination.next_num
prev_num = pagination.prev_num

Aggregation

from sqlalchemy import func

# Count
total = db.session.query(func.count(User.id)).scalar()

# Sum
total_revenue = db.session.query(func.sum(Order.total_amount)).scalar()

# Average
avg_age = db.session.query(func.avg(User.age)).scalar()

# Min / Max
oldest = db.session.query(func.min(User.created_at)).scalar()
newest = db.session.query(func.max(User.created_at)).scalar()

# Group by
role_counts = db.session.query(
    User.role,
    func.count(User.id).label('count')
).group_by(User.role).all()

for role, count in role_counts:
    print(f'{role}: {count}')

# Group by with having
popular_roles = db.session.query(
    User.role,
    func.count(User.id).label('count')
).group_by(User.role).having(func.count(User.id) > 5).all()

7. Database Migrations

In production, you cannot use db.create_all() to evolve your schema. It does not alter existing tables — it only creates missing ones. Database migrations track every schema change as a versioned script that can be applied (upgrade) or reversed (downgrade). Flask-Migrate wraps Alembic, the migration tool for SQLAlchemy.

Installation and Setup

pip install Flask-Migrate
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate

db = SQLAlchemy()
migrate = Migrate()

def create_app():
    app = Flask(__name__)
    app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///app.db'
    app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False

    db.init_app(app)
    migrate.init_app(app, db)

    return app

Migration Commands

# Initialize the migrations directory (run once)
flask db init

# Generate a migration script after changing models
flask db migrate -m "Add users table"

# Apply the migration to the database
flask db upgrade

# Revert the last migration
flask db downgrade

# Show current migration version
flask db current

# Show migration history
flask db history

# Upgrade to a specific version
flask db upgrade ae1027a6acf

# Downgrade to a specific version
flask db downgrade ae1027a6acf

Typical Workflow

# 1. Make changes to your models (add column, new table, etc.)
# 2. Generate migration
flask db migrate -m "Add phone_number column to users"

# 3. Review the generated migration file in migrations/versions/
# 4. Apply
flask db upgrade

# 5. Commit migration file to version control
git add migrations/
git commit -m "Add phone_number column migration"

Critical rule: Always review auto-generated migration files before applying them. Alembic does its best to detect changes, but it can miss things like column renames (it sees a drop + add instead), data migrations, or index changes. Edit the generated file if needed.

Example Migration File

"""Add phone_number column to users

Revision ID: a1b2c3d4e5f6
Revises: 9z8y7x6w5v4u
Create Date: 2026-02-26 10:30:00.000000
"""
from alembic import op
import sqlalchemy as sa

# revision identifiers
revision = 'a1b2c3d4e5f6'
down_revision = '9z8y7x6w5v4u'
branch_labels = None
depends_on = None


def upgrade():
    op.add_column('users', sa.Column('phone_number', sa.String(20), nullable=True))
    op.create_index('idx_users_phone', 'users', ['phone_number'])


def downgrade():
    op.drop_index('idx_users_phone', table_name='users')
    op.drop_column('users', 'phone_number')

8. Seeding Data

Seeding populates your database with initial or test data. Flask's CLI makes it easy to create custom commands for this.

import click
from flask.cli import with_appcontext
from extensions import db
from models import User, Product, Category

@click.command('seed-db')
@with_appcontext
def seed_db_command():
    """Seed the database with sample data."""
    # Clear existing data
    db.session.execute(db.text('DELETE FROM users'))
    db.session.execute(db.text('DELETE FROM products'))
    db.session.execute(db.text('DELETE FROM categories'))

    # Seed categories
    categories = [
        Category(name='Electronics', description='Gadgets and devices'),
        Category(name='Clothing', description='Apparel and accessories'),
        Category(name='Books', description='Physical and digital books'),
    ]
    db.session.add_all(categories)
    db.session.flush()  # Flush to get IDs without committing

    # Seed users
    users = [
        User(username='admin', email='admin@example.com', password_hash='hashed_admin', role='admin'),
        User(username='alice', email='alice@example.com', password_hash='hashed_alice'),
        User(username='bob', email='bob@example.com', password_hash='hashed_bob'),
    ]
    db.session.add_all(users)

    # Seed products
    products = [
        Product(name='Laptop', price=999.99, category_id=categories[0].id, stock=50),
        Product(name='T-Shirt', price=19.99, category_id=categories[1].id, stock=200),
        Product(name='Python Cookbook', price=39.99, category_id=categories[2].id, stock=100),
    ]
    db.session.add_all(products)

    db.session.commit()
    click.echo('Database seeded successfully.')


# Register the command in your app factory
def create_app():
    app = Flask(__name__)
    # ... config, extensions ...
    app.cli.add_command(seed_db_command)
    return app
# Run the seed command
flask seed-db

Using Faker for Realistic Test Data

from faker import Faker

fake = Faker()

@click.command('seed-fake')
@click.argument('count', default=50)
@with_appcontext
def seed_fake_command(count):
    """Generate fake users for development."""
    users = []
    for _ in range(count):
        users.append(User(
            username=fake.unique.user_name(),
            email=fake.unique.email(),
            password_hash=fake.sha256(),
            bio=fake.paragraph(nb_sentences=3),
            is_active=fake.boolean(chance_of_getting_true=85),
            created_at=fake.date_time_between(start_date='-1y', end_date='now')
        ))

    db.session.add_all(users)
    db.session.commit()
    click.echo(f'Created {count} fake users.')

9. Connection Pooling

Database connections are expensive to create. Connection pooling keeps a set of connections open and reuses them across requests. SQLAlchemy handles this automatically, but you should tune the settings for your workload.

app.config['SQLALCHEMY_ENGINE_OPTIONS'] = {
    'pool_size': 10,          # Number of permanent connections to keep
    'max_overflow': 20,       # Extra connections allowed beyond pool_size
    'pool_timeout': 30,       # Seconds to wait for a connection from the pool
    'pool_recycle': 1800,     # Recycle connections after 30 minutes
    'pool_pre_ping': True,    # Test connections before using them (handles stale connections)
}

Understanding Pool Parameters

  • pool_size — The number of connections kept open persistently. Set this to match your typical concurrent request count. Default is 5.
  • max_overflow — When all pool_size connections are in use, SQLAlchemy can create up to max_overflow additional temporary connections. These are closed when returned to the pool. Default is 10.
  • pool_timeout — How long a request will wait for a connection from the pool before raising an error. Default is 30 seconds.
  • pool_recycle — Connections older than this many seconds are recycled (closed and reopened). Critical for MySQL, which closes idle connections after wait_timeout (default 8 hours). Set this lower than your database's idle timeout.
  • pool_pre_ping — Issues a lightweight SELECT 1 before using a connection. Catches dead connections without the application seeing an error. Small overhead but highly recommended for production.

Production Guidelines

# For a typical web app handling ~50 concurrent requests
app.config['SQLALCHEMY_ENGINE_OPTIONS'] = {
    'pool_size': 20,
    'max_overflow': 30,
    'pool_timeout': 30,
    'pool_recycle': 1800,
    'pool_pre_ping': True,
}

# For a lightweight app or development
app.config['SQLALCHEMY_ENGINE_OPTIONS'] = {
    'pool_size': 5,
    'max_overflow': 10,
    'pool_recycle': 3600,
    'pool_pre_ping': True,
}

# To disable pooling entirely (useful for debugging)
from sqlalchemy.pool import NullPool
app.config['SQLALCHEMY_ENGINE_OPTIONS'] = {
    'poolclass': NullPool,
}

10. Transactions

A transaction groups multiple database operations into a single atomic unit — either all of them succeed, or none of them do. SQLAlchemy uses transactions implicitly (every operation between commit() calls is a transaction), but understanding explicit transaction control is essential for correctness.

Basic Transaction Pattern

# Implicit transaction — the most common pattern
try:
    user = User(username='alice', email='alice@example.com', password_hash='hash')
    db.session.add(user)

    profile = Profile(user=user, bio='Software engineer')
    db.session.add(profile)

    db.session.commit()  # Both user and profile are saved atomically
except Exception as e:
    db.session.rollback()  # Undo everything if any operation fails
    raise e

Context Manager Pattern (Cleaner)

from contextlib import contextmanager

@contextmanager
def transaction():
    """Context manager for database transactions."""
    try:
        yield db.session
        db.session.commit()
    except Exception:
        db.session.rollback()
        raise

# Usage
with transaction() as session:
    user = User(username='bob', email='bob@example.com', password_hash='hash')
    session.add(user)
    profile = Profile(user=user, bio='Data scientist')
    session.add(profile)
# Commits automatically on exit, rolls back on exception

Nested Transactions (Savepoints)

def place_order(user_id, items):
    """Place an order with savepoints for partial rollback."""
    try:
        order = Order(user_id=user_id, status='pending')
        db.session.add(order)
        db.session.flush()  # Get the order ID

        for item in items:
            # Savepoint for each item — if one fails, we can skip it
            savepoint = db.session.begin_nested()
            try:
                product = db.session.get(Product, item['product_id'])
                if product.stock < item['quantity']:
                    raise ValueError(f'Insufficient stock for {product.name}')

                product.stock -= item['quantity']
                order_item = OrderItem(
                    order_id=order.id,
                    product_id=product.id,
                    quantity=item['quantity'],
                    unit_price=product.price
                )
                db.session.add(order_item)
                savepoint.commit()
            except Exception as e:
                savepoint.rollback()
                print(f'Skipping item: {e}')

        order.total_amount = sum(
            oi.quantity * oi.unit_price for oi in order.items
        )
        db.session.commit()
        return order

    except Exception as e:
        db.session.rollback()
        raise e

Error Handling Best Practices

from sqlalchemy.exc import IntegrityError, OperationalError

def create_user(username, email, password_hash):
    """Create a user with proper error handling."""
    try:
        user = User(
            username=username,
            email=email,
            password_hash=password_hash
        )
        db.session.add(user)
        db.session.commit()
        return user, None

    except IntegrityError as e:
        db.session.rollback()
        if 'username' in str(e.orig):
            return None, 'Username already exists'
        if 'email' in str(e.orig):
            return None, 'Email already exists'
        return None, 'Duplicate entry'

    except OperationalError as e:
        db.session.rollback()
        return None, 'Database connection error'

    except Exception as e:
        db.session.rollback()
        return None, f'Unexpected error: {str(e)}'

11. Practical Example: E-Commerce Data Layer

Let us build a complete e-commerce data layer that ties together everything covered so far. This example includes four interconnected models, full CRUD operations, complex queries, and transactional order placement.

Models

from datetime import datetime, timezone
from extensions import db


class User(db.Model):
    __tablename__ = 'users'

    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(80), unique=True, nullable=False, index=True)
    email = db.Column(db.String(120), unique=True, nullable=False)
    password_hash = db.Column(db.String(256), nullable=False)
    is_active = db.Column(db.Boolean, default=True, nullable=False)
    created_at = db.Column(db.DateTime, default=lambda: datetime.now(timezone.utc))

    # Relationships
    orders = db.relationship('Order', back_populates='user', lazy='selectin')

    def __repr__(self):
        return f'<User {self.username}>'

    def to_dict(self):
        return {
            'id': self.id,
            'username': self.username,
            'email': self.email,
            'is_active': self.is_active,
            'created_at': self.created_at.isoformat(),
            'order_count': len(self.orders)
        }


class Product(db.Model):
    __tablename__ = 'products'

    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(200), nullable=False, index=True)
    description = db.Column(db.Text)
    price = db.Column(db.Numeric(10, 2), nullable=False)
    stock = db.Column(db.Integer, default=0, nullable=False)
    category = db.Column(db.String(50), nullable=False, index=True)
    is_available = db.Column(db.Boolean, default=True, nullable=False)
    created_at = db.Column(db.DateTime, default=lambda: datetime.now(timezone.utc))

    # Relationships
    order_items = db.relationship('OrderItem', back_populates='product', lazy='select')

    __table_args__ = (
        db.Index('idx_product_category_price', 'category', 'price'),
        db.CheckConstraint('price > 0', name='ck_positive_price'),
        db.CheckConstraint('stock >= 0', name='ck_non_negative_stock'),
    )

    def __repr__(self):
        return f'<Product {self.name} ${self.price}>'

    def to_dict(self):
        return {
            'id': self.id,
            'name': self.name,
            'description': self.description,
            'price': float(self.price),
            'stock': self.stock,
            'category': self.category,
            'is_available': self.is_available
        }


class Order(db.Model):
    __tablename__ = 'orders'

    id = db.Column(db.Integer, primary_key=True)
    user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False, index=True)
    status = db.Column(db.String(20), default='pending', nullable=False)
    total_amount = db.Column(db.Numeric(12, 2), default=0)
    shipping_address = db.Column(db.Text)
    created_at = db.Column(db.DateTime, default=lambda: datetime.now(timezone.utc))
    updated_at = db.Column(
        db.DateTime,
        default=lambda: datetime.now(timezone.utc),
        onupdate=lambda: datetime.now(timezone.utc)
    )

    # Relationships
    user = db.relationship('User', back_populates='orders')
    items = db.relationship('OrderItem', back_populates='order', lazy='selectin',
                            cascade='all, delete-orphan')

    __table_args__ = (
        db.Index('idx_order_user_status', 'user_id', 'status'),
    )

    def __repr__(self):
        return f'<Order #{self.id} - {self.status}>'

    def to_dict(self):
        return {
            'id': self.id,
            'user_id': self.user_id,
            'status': self.status,
            'total_amount': float(self.total_amount),
            'items': [item.to_dict() for item in self.items],
            'created_at': self.created_at.isoformat()
        }


class OrderItem(db.Model):
    __tablename__ = 'order_items'

    id = db.Column(db.Integer, primary_key=True)
    order_id = db.Column(db.Integer, db.ForeignKey('orders.id'), nullable=False)
    product_id = db.Column(db.Integer, db.ForeignKey('products.id'), nullable=False)
    quantity = db.Column(db.Integer, nullable=False)
    unit_price = db.Column(db.Numeric(10, 2), nullable=False)

    # Relationships
    order = db.relationship('Order', back_populates='items')
    product = db.relationship('Product', back_populates='order_items')

    __table_args__ = (
        db.CheckConstraint('quantity > 0', name='ck_positive_quantity'),
    )

    @property
    def subtotal(self):
        return float(self.quantity * self.unit_price)

    def __repr__(self):
        return f'<OrderItem {self.product.name} x{self.quantity}>'

    def to_dict(self):
        return {
            'id': self.id,
            'product_id': self.product_id,
            'product_name': self.product.name,
            'quantity': self.quantity,
            'unit_price': float(self.unit_price),
            'subtotal': self.subtotal
        }

CRUD Operations for Each Model

# ---- USER CRUD ----

def create_user(username, email, password_hash):
    """Create a new user."""
    user = User(username=username, email=email, password_hash=password_hash)
    db.session.add(user)
    db.session.commit()
    return user

def get_user(user_id):
    """Get a user by ID."""
    return db.session.get(User, user_id)

def update_user(user_id, **kwargs):
    """Update user fields."""
    user = db.session.get(User, user_id)
    if not user:
        return None
    for key, value in kwargs.items():
        if hasattr(user, key):
            setattr(user, key, value)
    db.session.commit()
    return user

def delete_user(user_id):
    """Soft-delete a user by deactivating."""
    user = db.session.get(User, user_id)
    if user:
        user.is_active = False
        db.session.commit()
    return user


# ---- PRODUCT CRUD ----

def create_product(name, price, category, stock=0, description=None):
    """Create a new product."""
    product = Product(
        name=name, price=price, category=category,
        stock=stock, description=description
    )
    db.session.add(product)
    db.session.commit()
    return product

def get_products_by_category(category, min_price=None, max_price=None):
    """Get products filtered by category and optional price range."""
    query = Product.query.filter_by(category=category, is_available=True)
    if min_price is not None:
        query = query.filter(Product.price >= min_price)
    if max_price is not None:
        query = query.filter(Product.price <= max_price)
    return query.order_by(Product.price.asc()).all()

def update_stock(product_id, quantity_change):
    """Adjust product stock. Use negative values for decrements."""
    product = db.session.get(Product, product_id)
    if product:
        product.stock += quantity_change
        if product.stock <= 0:
            product.is_available = False
        db.session.commit()
    return product

Complex Queries (Joins and Aggregations)

from sqlalchemy import func, desc

def get_top_customers(limit=10):
    """Get customers with the highest total spend."""
    results = db.session.query(
        User.username,
        User.email,
        func.count(Order.id).label('order_count'),
        func.sum(Order.total_amount).label('total_spent')
    ).join(Order, User.id == Order.user_id)\
     .filter(Order.status == 'completed')\
     .group_by(User.id)\
     .order_by(desc('total_spent'))\
     .limit(limit)\
     .all()

    return [
        {
            'username': r.username,
            'email': r.email,
            'order_count': r.order_count,
            'total_spent': float(r.total_spent)
        }
        for r in results
    ]


def get_revenue_by_category():
    """Get total revenue grouped by product category."""
    results = db.session.query(
        Product.category,
        func.sum(OrderItem.quantity * OrderItem.unit_price).label('revenue'),
        func.sum(OrderItem.quantity).label('units_sold')
    ).join(OrderItem, Product.id == OrderItem.product_id)\
     .join(Order, OrderItem.order_id == Order.id)\
     .filter(Order.status == 'completed')\
     .group_by(Product.category)\
     .order_by(desc('revenue'))\
     .all()

    return [
        {
            'category': r.category,
            'revenue': float(r.revenue),
            'units_sold': r.units_sold
        }
        for r in results
    ]


def get_user_order_history(user_id, page=1, per_page=10):
    """Get paginated order history for a user with item details."""
    return Order.query\
        .filter_by(user_id=user_id)\
        .order_by(Order.created_at.desc())\
        .paginate(page=page, per_page=per_page, error_out=False)


def search_products(query_text, category=None, in_stock_only=True):
    """Full-text product search with filters."""
    q = Product.query.filter(
        Product.name.ilike(f'%{query_text}%')
    )
    if category:
        q = q.filter_by(category=category)
    if in_stock_only:
        q = q.filter(Product.stock > 0, Product.is_available == True)
    return q.order_by(Product.name).all()

Transactional Order Placement

from decimal import Decimal
from sqlalchemy.exc import IntegrityError

def place_order(user_id, cart_items, shipping_address):
    """
    Place an order atomically.

    Args:
        user_id: ID of the user placing the order
        cart_items: List of dicts with 'product_id' and 'quantity'
        shipping_address: Shipping address string

    Returns:
        (Order, None) on success, (None, error_message) on failure
    """
    try:
        # Verify user exists and is active
        user = db.session.get(User, user_id)
        if not user or not user.is_active:
            return None, 'Invalid or inactive user'

        # Create the order
        order = Order(
            user_id=user_id,
            status='pending',
            shipping_address=shipping_address
        )
        db.session.add(order)
        db.session.flush()  # Get order.id without committing

        total = Decimal('0.00')

        for item in cart_items:
            # Lock the product row to prevent race conditions
            product = db.session.query(Product).filter_by(
                id=item['product_id']
            ).with_for_update().first()

            if not product:
                db.session.rollback()
                return None, f'Product {item["product_id"]} not found'

            if not product.is_available:
                db.session.rollback()
                return None, f'{product.name} is no longer available'

            if product.stock < item['quantity']:
                db.session.rollback()
                return None, f'Insufficient stock for {product.name} (available: {product.stock})'

            # Deduct stock
            product.stock -= item['quantity']
            if product.stock == 0:
                product.is_available = False

            # Create order item
            order_item = OrderItem(
                order_id=order.id,
                product_id=product.id,
                quantity=item['quantity'],
                unit_price=product.price
            )
            db.session.add(order_item)

            total += product.price * item['quantity']

        order.total_amount = total
        db.session.commit()

        return order, None

    except IntegrityError as e:
        db.session.rollback()
        return None, f'Data integrity error: {str(e.orig)}'

    except Exception as e:
        db.session.rollback()
        return None, f'Order failed: {str(e)}'

Using the Order Placement

@app.route('/api/orders', methods=['POST'])
def create_order():
    data = request.get_json()

    order, error = place_order(
        user_id=data['user_id'],
        cart_items=data['items'],  # [{'product_id': 1, 'quantity': 2}, ...]
        shipping_address=data['shipping_address']
    )

    if error:
        return jsonify({'error': error}), 400

    return jsonify(order.to_dict()), 201

12. Raw SQL

Sometimes the ORM gets in the way. Complex reporting queries, database-specific features (window functions, CTEs, recursive queries), or bulk operations are often cleaner and faster as raw SQL. SQLAlchemy makes this straightforward.

When to Use Raw SQL

  • Complex analytical queries with multiple JOINs, subqueries, or window functions
  • Bulk INSERT/UPDATE/DELETE where ORM overhead is unacceptable
  • Database-specific features (PostgreSQL JSONB operators, MySQL full-text search)
  • Performance-critical paths where you need exact control over the generated SQL
  • Existing SQL queries you are porting from another system

Using db.session.execute()

from sqlalchemy import text

# Simple query
result = db.session.execute(text('SELECT * FROM users WHERE is_active = :active'), {'active': True})
users = result.fetchall()

for user in users:
    print(user.username, user.email)  # Access by column name

# Insert
db.session.execute(
    text('INSERT INTO users (username, email, password_hash) VALUES (:username, :email, :password)'),
    {'username': 'dave', 'email': 'dave@example.com', 'password': 'hashed_pw'}
)
db.session.commit()

# Update
db.session.execute(
    text('UPDATE products SET price = price * :multiplier WHERE category = :category'),
    {'multiplier': 1.10, 'category': 'Electronics'}
)
db.session.commit()

# Delete
db.session.execute(
    text('DELETE FROM sessions WHERE last_active < :cutoff'),
    {'cutoff': datetime(2026, 1, 1)}
)
db.session.commit()

Complex Reporting Query

def get_monthly_revenue_report(year):
    """Get monthly revenue breakdown with running totals."""
    sql = text("""
        SELECT
            EXTRACT(MONTH FROM o.created_at) AS month,
            COUNT(DISTINCT o.id) AS order_count,
            COUNT(DISTINCT o.user_id) AS unique_customers,
            SUM(o.total_amount) AS monthly_revenue,
            SUM(SUM(o.total_amount)) OVER (ORDER BY EXTRACT(MONTH FROM o.created_at)) AS running_total
        FROM orders o
        WHERE EXTRACT(YEAR FROM o.created_at) = :year
          AND o.status = 'completed'
        GROUP BY EXTRACT(MONTH FROM o.created_at)
        ORDER BY month
    """)

    result = db.session.execute(sql, {'year': year})
    return [
        {
            'month': int(row.month),
            'order_count': row.order_count,
            'unique_customers': row.unique_customers,
            'revenue': float(row.monthly_revenue),
            'running_total': float(row.running_total)
        }
        for row in result
    ]

Bulk Operations

# Bulk insert — much faster than ORM for large datasets
def bulk_import_products(products_data):
    """Import thousands of products efficiently."""
    sql = text("""
        INSERT INTO products (name, price, category, stock, is_available)
        VALUES (:name, :price, :category, :stock, :is_available)
    """)

    # Execute with a list of parameter dicts
    db.session.execute(sql, products_data)
    db.session.commit()

# Usage
products = [
    {'name': f'Product {i}', 'price': 9.99, 'category': 'Bulk', 'stock': 100, 'is_available': True}
    for i in range(10000)
]
bulk_import_products(products)

Security note: Always use parameterized queries with :param_name placeholders. Never use f-strings or string concatenation to build SQL — that is how SQL injection happens.


13. Performance Tips

Database performance problems are the most common cause of slow web applications. Here are the patterns and techniques that matter most in Flask-SQLAlchemy.

Avoiding the N+1 Query Problem

The N+1 problem occurs when you load a list of N objects and then access a relationship on each, causing N additional queries.

# BAD: N+1 queries — 1 query for orders + N queries for user on each order
orders = Order.query.all()
for order in orders:
    print(order.user.username)  # Each access triggers a separate SELECT

# GOOD: Eager loading with joinedload — 1 query total
from sqlalchemy.orm import joinedload

orders = Order.query.options(joinedload(Order.user)).all()
for order in orders:
    print(order.user.username)  # Already loaded, no extra query

# GOOD: Eager loading with selectinload — 2 queries total (better for one-to-many)
from sqlalchemy.orm import selectinload

users = User.query.options(selectinload(User.orders)).all()
for user in users:
    print(f'{user.username}: {len(user.orders)} orders')  # Already loaded

# Nested eager loading
orders = Order.query.options(
    joinedload(Order.user),
    selectinload(Order.items).joinedload(OrderItem.product)
).all()

Load Only What You Need

# Load only specific columns
usernames = db.session.query(User.username, User.email).filter_by(is_active=True).all()

# Defer heavy columns (load them on access)
from sqlalchemy.orm import defer

users = User.query.options(defer(User.bio), defer(User.password_hash)).all()

# Undefer when you need them
users = User.query.options(defer(User.bio)).all()
# Later, accessing user.bio will trigger a lazy load for that specific column

Indexing Strategy

# Index columns you filter, sort, or join on frequently
class Product(db.Model):
    __tablename__ = 'products'

    id = db.Column(db.Integer, primary_key=True)
    name = db.Column(db.String(200), index=True)          # Searched frequently
    category = db.Column(db.String(50), index=True)        # Filtered frequently
    price = db.Column(db.Numeric(10, 2))
    created_at = db.Column(db.DateTime, index=True)        # Sorted frequently

    # Composite index for queries that filter on both
    __table_args__ = (
        db.Index('idx_category_price', 'category', 'price'),
        db.Index('idx_category_created', 'category', 'created_at'),
    )

Query Profiling

# Enable SQL logging in development
app.config['SQLALCHEMY_ECHO'] = True

# Or use events for more control
from sqlalchemy import event
import time

@event.listens_for(db.engine, 'before_cursor_execute')
def before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
    conn.info['query_start_time'] = time.time()

@event.listens_for(db.engine, 'after_cursor_execute')
def after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
    total = time.time() - conn.info['query_start_time']
    if total > 0.5:  # Log slow queries (over 500ms)
        app.logger.warning(f'Slow query ({total:.2f}s): {statement}')

14. Common Pitfalls

These are the mistakes that cost real hours in debugging. Know them, avoid them.

Forgetting to Commit

# BUG: Changes are never persisted
user = User(username='alice', email='alice@example.com', password_hash='hash')
db.session.add(user)
# Missing: db.session.commit()
# The user exists in the session but NOT in the database

# FIX: Always commit after making changes
db.session.add(user)
db.session.commit()

Detached Instance Error

# BUG: Accessing attributes after the session is closed
def get_user_data():
    user = User.query.first()
    return user

# Later, outside the request context:
user = get_user_data()
print(user.orders)  # DetachedInstanceError! Session is gone.

# FIX 1: Eager load what you need
def get_user_data():
    return User.query.options(selectinload(User.orders)).first()

# FIX 2: Convert to dict while session is active
def get_user_data():
    user = User.query.first()
    return user.to_dict()  # Serialize within the session context

# FIX 3: Keep the object attached by using it within the request
@app.route('/users/<int:id>')
def get_user(id):
    user = User.query.get_or_404(id)
    return jsonify(user.to_dict())  # Serialized within request context

Lazy Loading in Templates

# BUG: Template triggers N+1 queries
@app.route('/orders')
def list_orders():
    orders = Order.query.all()
    return render_template('orders.html', orders=orders)
    # Template: {% for order in orders %} {{ order.user.username }} {% endfor %}
    # This fires a SELECT for each order's user!

# FIX: Eager load in the view
@app.route('/orders')
def list_orders():
    orders = Order.query.options(joinedload(Order.user)).all()
    return render_template('orders.html', orders=orders)

Not Handling Rollback

# BUG: Failed operation poisons the session for subsequent requests
try:
    db.session.add(user)
    db.session.commit()
except IntegrityError:
    pass  # Session is now in a broken state!

# FIX: Always rollback on error
try:
    db.session.add(user)
    db.session.commit()
except IntegrityError:
    db.session.rollback()
    # Now the session is clean for the next operation

Using db.create_all() in Production

# BAD: This will NOT update existing tables
# If you add a column to a model, create_all() ignores it
db.create_all()  # Only creates tables that don't exist

# GOOD: Use migrations for all schema changes
# flask db migrate -m "Add new column"
# flask db upgrade

15. Best Practices

Always Use Migrations

Set up Flask-Migrate from day one, even for small projects. db.create_all() is only acceptable for throwaway prototypes and test fixtures. Every schema change should be a migration file committed to version control.

Model Validation

from sqlalchemy import validates

class User(db.Model):
    __tablename__ = 'users'

    id = db.Column(db.Integer, primary_key=True)
    username = db.Column(db.String(80), unique=True, nullable=False)
    email = db.Column(db.String(120), unique=True, nullable=False)
    age = db.Column(db.Integer)

    @validates('email')
    def validate_email(self, key, email):
        if '@' not in email:
            raise ValueError('Invalid email address')
        return email.lower().strip()

    @validates('username')
    def validate_username(self, key, username):
        if len(username) < 3:
            raise ValueError('Username must be at least 3 characters')
        if not username.isalnum():
            raise ValueError('Username must be alphanumeric')
        return username.lower().strip()

    @validates('age')
    def validate_age(self, key, age):
        if age is not None and (age < 0 or age > 150):
            raise ValueError('Age must be between 0 and 150')
        return age

Repository Pattern

Encapsulate database access in repository classes to keep your route handlers clean and your data access testable.

class UserRepository:
    """Encapsulates all database operations for User."""

    @staticmethod
    def create(username, email, password_hash):
        user = User(username=username, email=email, password_hash=password_hash)
        db.session.add(user)
        db.session.commit()
        return user

    @staticmethod
    def get_by_id(user_id):
        return db.session.get(User, user_id)

    @staticmethod
    def get_by_username(username):
        return User.query.filter_by(username=username).first()

    @staticmethod
    def get_active_users(page=1, per_page=20):
        return User.query.filter_by(is_active=True)\
            .order_by(User.created_at.desc())\
            .paginate(page=page, per_page=per_page, error_out=False)

    @staticmethod
    def update(user_id, **kwargs):
        user = db.session.get(User, user_id)
        if not user:
            return None
        for key, value in kwargs.items():
            if hasattr(user, key):
                setattr(user, key, value)
        db.session.commit()
        return user

    @staticmethod
    def deactivate(user_id):
        user = db.session.get(User, user_id)
        if user:
            user.is_active = False
            db.session.commit()
        return user


# Usage in routes — clean and testable
@app.route('/api/users', methods=['POST'])
def create_user():
    data = request.get_json()
    try:
        user = UserRepository.create(
            username=data['username'],
            email=data['email'],
            password_hash=generate_password_hash(data['password'])
        )
        return jsonify(user.to_dict()), 201
    except IntegrityError:
        db.session.rollback()
        return jsonify({'error': 'Username or email already exists'}), 409

Connection Management

# Ensure sessions are cleaned up after each request
@app.teardown_appcontext
def shutdown_session(exception=None):
    db.session.remove()

# Use pool_pre_ping to handle stale connections
app.config['SQLALCHEMY_ENGINE_OPTIONS'] = {
    'pool_pre_ping': True,
    'pool_recycle': 1800,
}

Environment-Based Configuration

import os

class Config:
    SQLALCHEMY_TRACK_MODIFICATIONS = False
    SQLALCHEMY_ENGINE_OPTIONS = {
        'pool_pre_ping': True,
    }

class DevelopmentConfig(Config):
    SQLALCHEMY_DATABASE_URI = 'sqlite:///dev.db'
    SQLALCHEMY_ECHO = True

class TestingConfig(Config):
    SQLALCHEMY_DATABASE_URI = 'sqlite:///:memory:'
    TESTING = True

class ProductionConfig(Config):
    SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL')
    SQLALCHEMY_ENGINE_OPTIONS = {
        'pool_size': 20,
        'max_overflow': 30,
        'pool_recycle': 1800,
        'pool_pre_ping': True,
    }

config = {
    'development': DevelopmentConfig,
    'testing': TestingConfig,
    'production': ProductionConfig,
}

16. Key Takeaways

  • Flask-SQLAlchemy is the standard integration layer between Flask and SQLAlchemy. It manages the session lifecycle and provides convenient helpers.
  • Models map to tables. Define them as Python classes inheriting from db.Model. Use column types, constraints, and indexes to enforce data integrity at the database level.
  • Relationships connect models. Use db.relationship() with back_populates for bidirectional access. Choose the right lazy loading strategy to avoid N+1 queries.
  • Always use migrations in production. Flask-Migrate wraps Alembic and gives you versioned, reversible schema changes. Review auto-generated migrations before applying.
  • Transactions are automatic between commit() calls. Always rollback() on error. Use begin_nested() for savepoints when you need partial rollback.
  • Connection pooling is handled by SQLAlchemy. Tune pool_size, pool_recycle, and enable pool_pre_ping for production stability.
  • Eager loading (joinedload, selectinload) is your primary weapon against N+1 performance problems. Profile your queries in development with SQLALCHEMY_ECHO = True.
  • Drop to raw SQL when the ORM generates inefficient queries or you need database-specific features. Use text() with parameterized queries — never string concatenation.
  • Validate at the model layer using @validates decorators and database constraints (CheckConstraint, unique, nullable).
  • Use the repository pattern to keep data access logic out of your route handlers. This makes your code testable and your routes readable.
  • Configuration should be environment-aware. Use different database URIs, pool sizes, and logging levels for development, testing, and production.



Subscribe To Our Newsletter
You will receive our latest post and tutorial.
Thank you for subscribing!

required
required


Leave a Reply

Your email address will not be published. Required fields are marked *