If you are preparing for a Python web developer interview, Flask is one of the most commonly tested frameworks. Whether the role involves building microservices, REST APIs, or full web applications, interviewers expect you to demonstrate not just familiarity with Flask’s API, but a deeper understanding of its design philosophy and the trade-offs it makes compared to heavier frameworks like Django.
This guide covers the questions that actually come up in technical interviews, organized by category and difficulty. Each answer includes the reasoning behind it, production-quality code examples, and notes on what the interviewer is really evaluating. If you can speak confidently to these topics, you will be well-positioned for mid-level to senior Flask roles.
Flask is a micro web framework for Python built on top of Werkzeug (a WSGI toolkit) and Jinja2 (a template engine). The term “micro” does not mean Flask lacks capability — it means Flask does not impose decisions on you. There is no built-in ORM, no form validation library, and no specific project layout required out of the box.
You would choose Flask when you need fine-grained control over your application’s architecture, when you are building a small-to-medium service or API, or when your team prefers to pick and compose libraries rather than accept a monolithic framework’s opinions. Flask is also the go-to choice for microservices because of its minimal footprint and fast startup time.
Why interviewers ask this: They want to see that you understand Flask’s philosophy, not just its syntax. A strong answer shows you can reason about architectural trade-offs.
from flask import Flask
app = Flask(__name__)
@app.route('/')
def index():
return "Hello, Flask!"
if __name__ == '__main__':
app.run(debug=True)
The Flask(__name__) call tells Flask where to find templates and static files relative to the module. The @app.route decorator binds a URL path to a Python function. When you run this file directly, app.run(debug=True) starts the development server with the interactive debugger and auto-reloader enabled.
Why interviewers ask this: It is a baseline check. They are watching for whether you include the if __name__ guard and whether you know what the __name__ argument does.
The application factory pattern defers the creation of the Flask app object to a function rather than creating it at module level. This solves several real-world problems: it allows you to create multiple instances with different configurations (critical for testing), it avoids circular import issues, and it gives you a clean place to register extensions, blueprints, and error handlers.
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
db = SQLAlchemy()
migrate = Migrate()
def create_app(config_name='default'):
app = Flask(__name__)
app.config.from_object(config[config_name])
db.init_app(app)
migrate.init_app(app, db)
from .main import main_bp
app.register_blueprint(main_bp)
from .api import api_bp
app.register_blueprint(api_bp, url_prefix='/api/v1')
return app
Why interviewers ask this: This separates junior from senior candidates. If you only know the single-file style, you have likely not worked on production Flask applications. Interviewers want to see that you understand testability and modularity.
Blueprints let you organize a Flask application into discrete components, each with its own routes, templates, static files, and error handlers. They are registered on the application object, not instantiated as apps themselves. This is Flask’s answer to modularization.
from flask import Blueprint
auth_bp = Blueprint('auth', __name__, template_folder='templates')
@auth_bp.route('/login', methods=['GET', 'POST'])
def login():
# login logic here
return render_template('auth/login.html')
@auth_bp.route('/logout')
def logout():
# logout logic here
return redirect(url_for('main.index'))
In your factory function, you register it:
app.register_blueprint(auth_bp, url_prefix='/auth')
Use blueprints when your application has logically distinct areas (authentication, admin panel, public API, etc.) or when multiple developers are working on different features simultaneously. Blueprints also enable you to package reusable functionality that can be shared across projects.
Why interviewers ask this: They are testing whether you have experience structuring non-trivial applications. Knowing blueprints signals production experience.
Flask uses two context stacks to manage state without passing objects explicitly through every function call:
current_app, g): Active when the app is handling a request or when you manually push a context. The current_app proxy gives you access to the app’s configuration and extensions. The g object is a per-request namespace for storing data (like a database connection) that should be available throughout the request lifecycle.request, session): Created when Flask receives an HTTP request and torn down after the response is sent. The request proxy gives you access to headers, form data, query parameters, and JSON payloads. The session proxy provides access to the user’s session data.from flask import current_app, g, request
@app.route('/example')
def example():
# request context is active here
user_agent = request.headers.get('User-Agent')
# application context is also active
debug_mode = current_app.config['DEBUG']
# g is per-request storage
g.db = get_db_connection()
return f"Debug: {debug_mode}, UA: {user_agent}"
A common pitfall is trying to access current_app or request outside of a request. In CLI commands or background tasks, you need to manually push an application context using with app.app_context():.
Why interviewers ask this: Understanding contexts is one of the most reliable indicators of Flask depth. Developers who have only built toy apps will struggle with this question.
Flask’s power comes from its extension ecosystem. Extensions follow a convention: they provide an init_app() method for deferred initialization (compatible with the factory pattern) and are typically prefixed with Flask-. The most important ones to know are:
When evaluating an extension, check that it supports init_app(), is actively maintained, and does not tightly couple you to a specific implementation.
Why interviewers ask this: They want to know if you can build a production application by selecting the right tools, and whether you understand deferred initialization.
A route maps a URL pattern to a Python function (called a view function). Flask uses Werkzeug’s routing system under the hood, which supports static paths, variable rules with type converters, and HTTP method filtering.
@app.route('/users/<int:user_id>', methods=['GET'])
def get_user(user_id):
user = User.query.get_or_404(user_id)
return jsonify(user.to_dict())
@app.route('/users', methods=['POST'])
def create_user():
data = request.get_json()
user = User(username=data['username'], email=data['email'])
db.session.add(user)
db.session.commit()
return jsonify(user.to_dict()), 201
Flask’s built-in URL converters include string (default), int, float, path (like string but accepts slashes), and uuid. You can also create custom converters for specialized URL patterns.
Why interviewers ask this: Routing is foundational. They want to see that you know about type converters, HTTP method restrictions, and the 404 shortcut methods.
The primary HTTP methods are:
@app.route('/articles/<int:article_id>', methods=['GET', 'PUT', 'PATCH', 'DELETE'])
def article(article_id):
article = Article.query.get_or_404(article_id)
if request.method == 'GET':
return jsonify(article.to_dict())
elif request.method == 'PUT':
data = request.get_json()
article.title = data['title']
article.body = data['body']
db.session.commit()
return jsonify(article.to_dict())
elif request.method == 'PATCH':
data = request.get_json()
if 'title' in data:
article.title = data['title']
if 'body' in data:
article.body = data['body']
db.session.commit()
return jsonify(article.to_dict())
elif request.method == 'DELETE':
db.session.delete(article)
db.session.commit()
return '', 204
Why interviewers ask this: They are checking whether you understand REST semantics (idempotency, safety) beyond just knowing the method names.
from flask import redirect, url_for
@app.route('/old-dashboard')
def old_dashboard():
return redirect(url_for('new_dashboard'), code=301)
@app.route('/dashboard')
def new_dashboard():
return "Welcome to the new dashboard"
Always use url_for() instead of hardcoding URLs. It builds URLs from the endpoint name, which means your redirects will not break if you rename a route’s URL pattern. Use 301 for permanent redirects (SEO-friendly) and 302 (the default) for temporary redirects.
Why interviewers ask this: Using url_for() instead of a string literal is a signal that you understand maintainable Flask development.
Jinja2 is Flask’s default template engine. It lets you embed Python-like expressions in HTML while enforcing a separation between logic and presentation. Templates are stored in a templates/ directory by default.
# In your view
from flask import render_template
@app.route('/profile/<username>')
def profile(username):
user = User.query.filter_by(username=username).first_or_404()
posts = user.posts.order_by(Post.created_at.desc()).all()
return render_template('profile.html', user=user, posts=posts)
<!-- templates/base.html -->
<!DOCTYPE html>
<html>
<head><title>{% block title %}My App{% endblock %}</title></head>
<body>
<nav>{% include 'nav.html' %}</nav>
{% block content %}{% endblock %}
</body>
</html>
<!-- templates/profile.html -->
{% extends 'base.html' %}
{% block title %}{{ user.username }}'s Profile{% endblock %}
{% block content %}
<h1>{{ user.username }}</h1>
{% for post in posts %}
<article>
<h2>{{ post.title }}</h2>
<p>{{ post.body | truncate(200) }}</p>
<small>{{ post.created_at | strftime('%B %d, %Y') }}</small>
</article>
{% else %}
<p>No posts yet.</p>
{% endfor %}
{% endblock %}
Key Jinja2 features include template inheritance (extends/block), includes, macros (reusable template functions), filters (like truncate, safe, escape), and automatic HTML escaping to prevent XSS attacks.
Why interviewers ask this: Template inheritance and auto-escaping are essential to building secure, maintainable web apps. Knowing about {% else %} on for-loops shows deeper Jinja2 knowledge.
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
def create_app():
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql://user:pass@localhost/mydb'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False # Suppress deprecation warning
app.config['SQLALCHEMY_ENGINE_OPTIONS'] = {
'pool_size': 10,
'pool_recycle': 3600,
}
db.init_app(app)
return app
Always set SQLALCHEMY_TRACK_MODIFICATIONS to False unless you specifically need the event system — it consumes extra memory. In production, configure connection pooling parameters to prevent database connection exhaustion.
Why interviewers ask this: Setting SQLALCHEMY_TRACK_MODIFICATIONS = False and knowing about connection pooling separates production experience from tutorial-level knowledge.
from datetime import datetime
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False, index=True)
email = db.Column(db.String(120), unique=True, nullable=False)
password_hash = db.Column(db.String(256), nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
posts = db.relationship('Post', backref='author', lazy='dynamic')
def __repr__(self):
return f'<User {self.username}>'
class Post(db.Model):
__tablename__ = 'posts'
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(200), nullable=False)
body = db.Column(db.Text, nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow, index=True)
author_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
def __repr__(self):
return f'<Post {self.title}>'
Notice the use of lazy='dynamic' on the relationship, which returns a query object instead of loading all posts eagerly. This is important when a user could have thousands of posts. Also note the explicit __tablename__, indexes on frequently queried columns, and nullable=False constraints to enforce data integrity at the database level.
Why interviewers ask this: Modeling relationships correctly, understanding lazy loading strategies, and adding proper indexes demonstrates database design competence.
Flask-Migrate wraps Alembic, the database migration tool for SQLAlchemy. It tracks changes to your models and generates migration scripts that can be applied (or rolled back) to keep the database schema in sync with your code.
# Terminal commands # Initialize the migration repository (run once) # flask db init # Generate a migration after changing models # flask db migrate -m "add posts table" # Apply the migration to the database # flask db upgrade # Roll back the last migration # flask db downgrade
Always review the auto-generated migration script before running upgrade. Alembic cannot detect all changes (for example, renaming a column will be detected as a drop and create, which destroys data). In those cases, you need to manually edit the migration.
Why interviewers ask this: Database migrations are critical in production. Knowing that you need to review auto-generated scripts and understanding rollback shows operational maturity.
There are two main approaches depending on your application type:
Session-based authentication (for traditional web apps): Use Flask-Login, which manages user sessions and provides decorators like @login_required.
from flask_login import LoginManager, login_user, logout_user, login_required, current_user
from werkzeug.security import generate_password_hash, check_password_hash
login_manager = LoginManager()
login_manager.login_view = 'auth.login'
@login_manager.user_loader
def load_user(user_id):
return User.query.get(int(user_id))
@auth_bp.route('/login', methods=['POST'])
def login():
data = request.get_json()
user = User.query.filter_by(email=data['email']).first()
if user and check_password_hash(user.password_hash, data['password']):
login_user(user, remember=data.get('remember', False))
return jsonify({'message': 'Logged in successfully'})
return jsonify({'error': 'Invalid credentials'}), 401
@auth_bp.route('/logout')
@login_required
def logout():
logout_user()
return jsonify({'message': 'Logged out successfully'})
Token-based authentication (for APIs): Use Flask-JWT-Extended for stateless JWT authentication.
from flask_jwt_extended import JWTManager, create_access_token, jwt_required, get_jwt_identity
jwt = JWTManager(app)
@app.route('/api/login', methods=['POST'])
def api_login():
data = request.get_json()
user = User.query.filter_by(username=data['username']).first()
if user and check_password_hash(user.password_hash, data['password']):
access_token = create_access_token(identity=user.id)
return jsonify(access_token=access_token)
return jsonify({'error': 'Invalid credentials'}), 401
@app.route('/api/protected')
@jwt_required()
def protected():
current_user_id = get_jwt_identity()
user = User.query.get(current_user_id)
return jsonify(logged_in_as=user.username)
Why interviewers ask this: They are testing whether you know when to use sessions vs. tokens, and whether you handle passwords securely (hashing, not storing plaintext).
werkzeug.security.generate_password_hash(). Never store plaintext passwords.SECRET_KEY from environment variables, never hardcode it.request.get_json(force=False) and validate schemas with libraries like Marshmallow or Pydantic.X-Content-Type-Options, X-Frame-Options, and Content-Security-Policy.debug=True in production — it exposes an interactive debugger that allows arbitrary code execution.import os
app.config['SECRET_KEY'] = os.environ.get('SECRET_KEY')
app.config['SESSION_COOKIE_SECURE'] = True # Only send cookies over HTTPS
app.config['SESSION_COOKIE_HTTPONLY'] = True # Prevent JavaScript access to session cookie
app.config['SESSION_COOKIE_SAMESITE'] = 'Lax' # CSRF mitigation
Why interviewers ask this: Security awareness is non-negotiable for senior developers. They want to know you will not ship an application with the debug console open to the internet.
You can build APIs with plain Flask or use Flask-RESTful/Flask-RESTX for more structure. Here is a clean pattern using plain Flask with class-based views:
from flask import Flask, jsonify, request, abort
from flask.views import MethodView
app = Flask(__name__)
class UserAPI(MethodView):
def get(self, user_id=None):
if user_id is None:
# List all users
users = User.query.all()
return jsonify([u.to_dict() for u in users])
# Get single user
user = User.query.get_or_404(user_id)
return jsonify(user.to_dict())
def post(self):
data = request.get_json()
if not data or 'username' not in data:
abort(400, description='Username is required')
user = User(username=data['username'], email=data['email'])
db.session.add(user)
db.session.commit()
return jsonify(user.to_dict()), 201
def put(self, user_id):
user = User.query.get_or_404(user_id)
data = request.get_json()
user.username = data['username']
user.email = data['email']
db.session.commit()
return jsonify(user.to_dict())
def delete(self, user_id):
user = User.query.get_or_404(user_id)
db.session.delete(user)
db.session.commit()
return '', 204
# Register the view
user_view = UserAPI.as_view('user_api')
app.add_url_rule('/api/users', defaults={'user_id': None}, view_func=user_view, methods=['GET'])
app.add_url_rule('/api/users', view_func=user_view, methods=['POST'])
app.add_url_rule('/api/users/<int:user_id>', view_func=user_view, methods=['GET', 'PUT', 'DELETE'])
Why interviewers ask this: They want to see if you can design clean API endpoints, return proper HTTP status codes, and handle missing resources gracefully.
Cross-Origin Resource Sharing (CORS) is required when your frontend and backend are on different domains. Use Flask-CORS:
from flask_cors import CORS
app = Flask(__name__)
# Allow all origins (development only)
CORS(app)
# Production: restrict to specific origins
CORS(app, resources={
r"/api/*": {
"origins": ["https://yourfrontend.com", "https://admin.yourfrontend.com"],
"methods": ["GET", "POST", "PUT", "DELETE"],
"allow_headers": ["Content-Type", "Authorization"]
}
})
Never use a wildcard (*) for origins in production. Be explicit about which domains, methods, and headers are allowed.
Why interviewers ask this: CORS issues are one of the most common problems when building SPAs that talk to Flask APIs. Knowing how to configure it properly (and securely) is practical knowledge.
Flask provides the @app.errorhandler decorator for registering handlers for specific HTTP status codes or exception types. In production, you should handle at least 404, 500, and any custom application exceptions.
from flask import Flask, render_template, jsonify, request
app = Flask(__name__)
class APIError(Exception):
"""Custom exception for API errors."""
def __init__(self, message, status_code=400):
self.message = message
self.status_code = status_code
@app.errorhandler(APIError)
def handle_api_error(error):
return jsonify({'error': error.message}), error.status_code
@app.errorhandler(404)
def not_found(error):
if request.path.startswith('/api/'):
return jsonify({'error': 'Resource not found'}), 404
return render_template('errors/404.html'), 404
@app.errorhandler(500)
def internal_error(error):
db.session.rollback() # Roll back any failed transactions
if request.path.startswith('/api/'):
return jsonify({'error': 'Internal server error'}), 500
return render_template('errors/500.html'), 500
# Usage in a view
@app.route('/api/items/<int:item_id>')
def get_item(item_id):
item = Item.query.get(item_id)
if not item:
raise APIError('Item not found', 404)
return jsonify(item.to_dict())
Notice how the 500 handler calls db.session.rollback() to clean up any failed database transaction. Also note the pattern of returning JSON for API routes and HTML for browser routes — this is essential for applications that serve both.
Why interviewers ask this: Error handling reveals production mindset. Rolling back database sessions on 500 errors and distinguishing API vs. browser responses shows real-world experience.
Flask provides four hooks that execute at different points in the request lifecycle:
before_first_request — Runs once before the very first request (removed in Flask 2.3, use app.startup or initialization in the factory).before_request — Runs before each request. If it returns a response, the view function is skipped.after_request — Runs after each request. Receives and must return the response object.teardown_request — Runs after the response is sent, even if an exception occurred. Used for cleanup.import time
from flask import g, request
@app.before_request
def before_request_func():
g.start_time = time.time()
# Example: require API key for all /api routes
if request.path.startswith('/api/') and request.endpoint != 'api.login':
api_key = request.headers.get('X-API-Key')
if not api_key or not is_valid_api_key(api_key):
return jsonify({'error': 'Invalid or missing API key'}), 401
@app.after_request
def after_request_func(response):
# Log request duration
duration = time.time() - g.start_time
app.logger.info(f'{request.method} {request.path} - {response.status_code} - {duration:.3f}s')
# Add security headers
response.headers['X-Content-Type-Options'] = 'nosniff'
response.headers['X-Frame-Options'] = 'DENY'
return response
@app.teardown_request
def teardown_request_func(exception):
# Close database connection if stored on g
db_conn = g.pop('db_conn', None)
if db_conn is not None:
db_conn.close()
For true WSGI middleware (which wraps the entire app), you can use Werkzeug’s ProxyFix or write your own class that implements the WSGI interface.
Why interviewers ask this: Request hooks are how you implement cross-cutting concerns like logging, authentication gates, and performance monitoring. This question tests architectural thinking.
The standard approach is to define configuration classes and select one based on an environment variable:
import os
class Config:
"""Base configuration."""
SECRET_KEY = os.environ.get('SECRET_KEY', 'fallback-dev-key')
SQLALCHEMY_TRACK_MODIFICATIONS = False
class DevelopmentConfig(Config):
DEBUG = True
SQLALCHEMY_DATABASE_URI = 'sqlite:///dev.db'
class TestingConfig(Config):
TESTING = True
SQLALCHEMY_DATABASE_URI = 'sqlite:///:memory:'
WTF_CSRF_ENABLED = False # Disable CSRF for testing
class ProductionConfig(Config):
SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL')
SESSION_COOKIE_SECURE = True
config = {
'development': DevelopmentConfig,
'testing': TestingConfig,
'production': ProductionConfig,
'default': DevelopmentConfig
}
# In your factory
def create_app(config_name=None):
if config_name is None:
config_name = os.environ.get('FLASK_ENV', 'default')
app = Flask(__name__)
app.config.from_object(config[config_name])
# Override with instance-specific config (not in version control)
app.config.from_pyfile('instance/config.py', silent=True)
# Override with environment variables
app.config.from_prefixed_env() # Flask 2.2+: reads FLASK_* env vars
return app
For sensitive values (database passwords, API keys), always use environment variables. Never commit secrets to version control. Use tools like python-dotenv to load .env files in development.
Why interviewers ask this: Every production application needs environment-specific configuration. This question tests whether you can structure an app for real deployment workflows.
Flask provides a test client that simulates HTTP requests without running a server. Combined with pytest and the application factory pattern, you get clean, isolated tests:
import pytest
from myapp import create_app, db
@pytest.fixture
def app():
app = create_app('testing')
with app.app_context():
db.create_all()
yield app
db.session.remove()
db.drop_all()
@pytest.fixture
def client(app):
return app.test_client()
@pytest.fixture
def runner(app):
return app.test_cli_runner()
def test_home_page(client):
response = client.get('/')
assert response.status_code == 200
assert b'Welcome' in response.data
def test_create_user(client):
response = client.post('/api/users', json={
'username': 'testuser',
'email': 'test@example.com',
'password': 'securepassword'
})
assert response.status_code == 201
data = response.get_json()
assert data['username'] == 'testuser'
def test_get_nonexistent_user(client):
response = client.get('/api/users/9999')
assert response.status_code == 404
def test_login_required(client):
response = client.get('/api/protected')
assert response.status_code == 401
Key testing practices: use an in-memory SQLite database for speed, create and drop tables for each test (or use transactions with rollback), and test both success and failure cases. Use client.get(), client.post(json=...), client.put(), etc. to simulate requests.
Why interviewers ask this: Writing tests is a hard requirement for senior roles. They want to see that you use fixtures, test isolation, and cover edge cases — not just the happy path.
Flask’s built-in server is for development only. For production, you need a proper WSGI server behind a reverse proxy:
# wsgi.py
from myapp import create_app
app = create_app('production')
# Run with Gunicorn # gunicorn -w 4 -b 0.0.0.0:8000 wsgi:app # Or with uWSGI # uwsgi --http :8000 --wsgi-file wsgi.py --callable app --processes 4 --threads 2
A typical production stack looks like:
# Dockerfile FROM python:3.11-slim WORKDIR /app COPY requirements.txt . RUN pip install --no-cache-dir -r requirements.txt COPY . . EXPOSE 8000 CMD ["gunicorn", "-w", "4", "-b", "0.0.0.0:8000", "wsgi:app"]
Important production considerations: set DEBUG = False, use environment variables for secrets, configure proper logging, set up health check endpoints, and use a process manager like systemd or Docker to restart crashed workers.
Why interviewers ask this: Deployment is where many developers stumble. Knowing the Nginx + Gunicorn stack and being able to write a Dockerfile shows you can ship code, not just write it.
| Criteria | Flask | Django |
|---|---|---|
| Philosophy | Micro-framework, pick your own tools | Batteries-included, convention over configuration |
| ORM | None built-in (typically SQLAlchemy) | Built-in Django ORM |
| Admin Panel | None (use Flask-Admin if needed) | Built-in, production-ready |
| Best For | APIs, microservices, small-to-medium apps | Content sites, e-commerce, rapid prototyping |
| Learning Curve | Lower initially, higher for large apps | Higher initially, lower for large apps |
| Flexibility | Maximum flexibility in architecture | Opinionated structure |
Choose Flask when you want full control over your stack, you are building a microservice or API, your team is experienced and wants to pick best-of-breed libraries, or you need to keep the dependency footprint small.
Choose Django when you need an admin interface out of the box, you are building a content-heavy site, you want built-in authentication/authorization/ORM without assembling pieces, or your team benefits from Django’s strong conventions.
Why interviewers ask this: This is not about which framework is “better.” They want to see that you can evaluate tools based on project requirements rather than personal preference.
Flask-WTF integrates WTForms with Flask and provides CSRF protection, file upload handling, and reCAPTCHA support.
from flask_wtf import FlaskForm
from wtforms import StringField, PasswordField, BooleanField, SubmitField
from wtforms.validators import DataRequired, Email, Length, EqualTo
class RegistrationForm(FlaskForm):
username = StringField('Username', validators=[
DataRequired(),
Length(min=3, max=80)
])
email = StringField('Email', validators=[
DataRequired(),
Email()
])
password = PasswordField('Password', validators=[
DataRequired(),
Length(min=8)
])
confirm_password = PasswordField('Confirm Password', validators=[
DataRequired(),
EqualTo('password', message='Passwords must match')
])
accept_tos = BooleanField('I accept the Terms of Service', validators=[DataRequired()])
submit = SubmitField('Register')
@app.route('/register', methods=['GET', 'POST'])
def register():
form = RegistrationForm()
if form.validate_on_submit():
user = User(
username=form.username.data,
email=form.email.data,
password_hash=generate_password_hash(form.password.data)
)
db.session.add(user)
db.session.commit()
flash('Registration successful!', 'success')
return redirect(url_for('auth.login'))
return render_template('register.html', form=form)
validate_on_submit() checks both that the request method is POST and that all validators pass. CSRF protection is enabled automatically as long as you set SECRET_KEY and include {{ form.hidden_tag() }} in your template.
Why interviewers ask this: Form handling and validation are fundamental to web applications. Knowing about CSRF protection and proper validation chains shows security awareness.
from flask_caching import Cache
cache = Cache()
def create_app():
app = Flask(__name__)
app.config['CACHE_TYPE'] = 'RedisCache'
app.config['CACHE_REDIS_URL'] = 'redis://localhost:6379/0'
app.config['CACHE_DEFAULT_TIMEOUT'] = 300
cache.init_app(app)
return app
# Cache an entire view response
@app.route('/api/stats')
@cache.cached(timeout=120, key_prefix='api_stats')
def get_stats():
# Expensive database aggregation
stats = compute_expensive_stats()
return jsonify(stats)
# Cache a function result with dynamic keys
@cache.memoize(timeout=60)
def get_user_profile(user_id):
return User.query.get(user_id)
# Invalidate cache when data changes
@app.route('/api/users/<int:user_id>', methods=['PUT'])
def update_user(user_id):
# ... update logic ...
cache.delete_memoized(get_user_profile, user_id)
return jsonify(user.to_dict())
Use @cache.cached() for view-level caching and @cache.memoize() for function-level caching with argument-based keys. In production, use Redis or Memcached as the cache backend, not the simple in-memory cache.
Why interviewers ask this: Caching is critical for performance at scale. They want to see that you understand cache invalidation strategies and the difference between view-level and function-level caching.
from flask import Flask, render_template
from flask_socketio import SocketIO, emit, join_room, leave_room
app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret'
socketio = SocketIO(app, cors_allowed_origins="*")
@socketio.on('connect')
def handle_connect():
emit('server_message', {'data': 'Connected successfully'})
@socketio.on('join')
def handle_join(data):
room = data['room']
join_room(room)
emit('server_message', {'data': f'Joined room: {room}'}, room=room)
@socketio.on('chat_message')
def handle_message(data):
room = data.get('room', 'general')
emit('new_message', {
'user': data['user'],
'message': data['message']
}, room=room, include_self=False)
if __name__ == '__main__':
socketio.run(app, debug=True)
Flask-SocketIO supports rooms for group messaging, namespaces for logical separation, and can use Redis as a message queue for multi-process deployments. For production, you need an async-capable worker like eventlet or gevent.
Why interviewers ask this: Real-time features are increasingly common. Understanding rooms, namespaces, and the production requirements for WebSocket deployments shows full-stack capability.
201 for creation and 204 for deletion.app object, the interviewer will wonder whether you have built anything beyond a tutorial.app.run(). Gunicorn, Nginx, Docker, environment variables, and proper logging are all part of shipping a Flask application.You have built a Flask application. It handles routes, talks to a database, renders templates, and works perfectly on your laptop. Now comes the part that separates hobby projects from production software: deployment.
Deployment is the process of taking your application from a development environment — where you run flask run and hit localhost:5000 — to a production environment where real users access it over the internet, 24 hours a day, under unpredictable load, with zero tolerance for data loss or security breaches.
The gap between development and production is enormous. In development, you have one user (yourself), debug mode is on, the database is local, and if the server crashes you just restart it. In production, you might have thousands of concurrent users, secrets must be locked down, the database needs connection pooling and backups, the server must survive crashes and restart automatically, and every request must be served over HTTPS.
This tutorial covers every aspect of deploying a Flask application to production. We will work through the deployment stack from the inside out: preparing your application code, configuring a production WSGI server, setting up a reverse proxy, containerizing with Docker, deploying to cloud platforms, and building CI/CD pipelines. By the end, you will have a complete, repeatable deployment workflow that you can use for any Flask project.
Before we dive into specifics, here is the high-level checklist every Flask deployment must address:
Let us work through each of these systematically.
Production readiness starts in your application code. Before you think about servers, containers, or cloud platforms, your Flask app itself must be configured correctly.
This is the single most critical deployment rule. Flask’s debug mode enables the interactive debugger, which allows anyone who can trigger an error to execute arbitrary Python code on your server. It also enables the reloader, which watches your files for changes and restarts the process — unnecessary overhead in production.
# NEVER do this in production app.run(debug=True) # Interactive debugger exposed to the internet # Correct: debug off, or better yet, don't use app.run() at all app.run(debug=False)
In production, you will not call app.run() at all. A WSGI server like Gunicorn imports your application object directly. But if your code has debug=True anywhere, make sure it is controlled by an environment variable.
Professional Flask applications use configuration classes to separate development, testing, and production settings. This pattern keeps sensitive production values out of your code and makes it easy to switch environments.
# config.py
import os
class Config:
"""Base configuration shared across all environments."""
SECRET_KEY = os.environ.get("SECRET_KEY", "fallback-dev-key-change-me")
SQLALCHEMY_TRACK_MODIFICATIONS = False
MAX_CONTENT_LENGTH = 16 * 1024 * 1024 # 16 MB upload limit
class DevelopmentConfig(Config):
"""Local development settings."""
DEBUG = True
SQLALCHEMY_DATABASE_URI = os.environ.get(
"DATABASE_URL",
"sqlite:///dev.db"
)
class TestingConfig(Config):
"""Test suite settings."""
TESTING = True
SQLALCHEMY_DATABASE_URI = "sqlite:///:memory:"
WTF_CSRF_ENABLED = False
class ProductionConfig(Config):
"""Production settings - all secrets from environment variables."""
DEBUG = False
TESTING = False
SQLALCHEMY_DATABASE_URI = os.environ["DATABASE_URL"] # No fallback; crash if missing
SECRET_KEY = os.environ["SECRET_KEY"] # No fallback; crash if missing
SESSION_COOKIE_SECURE = True
SESSION_COOKIE_HTTPONLY = True
SESSION_COOKIE_SAMESITE = "Lax"
PREFERRED_URL_SCHEME = "https"
config_by_name = {
"development": DevelopmentConfig,
"testing": TestingConfig,
"production": ProductionConfig,
}
Notice that ProductionConfig uses os.environ["DATABASE_URL"] without a fallback. This is intentional. If the environment variable is not set, the application crashes immediately at startup with a clear KeyError. This is far better than silently connecting to a wrong database or running with a default secret key.
Load the correct configuration in your application factory:
# app/__init__.py
import os
from flask import Flask
from config import config_by_name
def create_app(config_name=None):
if config_name is None:
config_name = os.environ.get("FLASK_ENV", "development")
app = Flask(__name__)
app.config.from_object(config_by_name[config_name])
# Initialize extensions
from app.extensions import db, migrate, ma
db.init_app(app)
migrate.init_app(app, db)
ma.init_app(app)
# Register blueprints
from app.routes import api_bp
app.register_blueprint(api_bp, url_prefix="/api")
return app
Your requirements.txt must pin every dependency to an exact version. Without pinning, a new install might pull a different version of a library that introduces breaking changes or security vulnerabilities.
# Generate pinned requirements from your current environment pip freeze > requirements.txt
A pinned requirements.txt looks like this:
# requirements.txt Flask==3.1.0 Flask-SQLAlchemy==3.1.1 Flask-Migrate==4.0.7 gunicorn==23.0.0 psycopg2-binary==2.9.10 python-dotenv==1.0.1 marshmallow==3.23.1 redis==5.2.1
For more robust dependency management, consider using pip-tools. You write a requirements.in with your direct dependencies, and pip-compile generates a fully pinned requirements.txt with all transitive dependencies and hash verification.
In development, Flask serves static files from the static/ directory. In production, this is inefficient — Flask is a Python application server, not a file server. Nginx (or a CDN) should serve static files directly, bypassing your Python process entirely. We will configure this in the Nginx section.
For now, make sure your static files are organized:
myapp/ ├── app/ │ ├── static/ │ │ ├── css/ │ │ ├── js/ │ │ └── images/ │ ├── templates/ │ └── ...
Flask’s built-in development server is single-threaded, not optimized for performance, and has no process management. It is designed for one thing: local development convenience. Running it in production is like driving a go-kart on the highway — it technically moves forward, but it is not built for the conditions.
A production WSGI server handles multiple concurrent requests using worker processes or threads, manages worker lifecycle (restarting crashed workers), and is tuned for throughput and reliability.
Gunicorn (Green Unicorn) is the most popular WSGI server for Python applications. It uses a pre-fork worker model: a master process spawns multiple worker processes, each handling requests independently. If a worker crashes, the master spawns a replacement.
# Install Gunicorn pip install gunicorn
# Run with default settings (1 worker) gunicorn "app:create_app()" # Specify host and port gunicorn --bind 0.0.0.0:8000 "app:create_app()" # Multiple workers gunicorn --workers 4 --bind 0.0.0.0:8000 "app:create_app()"
The string "app:create_app()" tells Gunicorn to import the app module and call create_app() to get the WSGI application object. If your app object is a module-level variable, use "app:app" or "wsgi:app".
The number of workers determines how many concurrent requests your server can handle. The general formula is:
workers = (2 * CPU_CORES) + 1
On a 4-core machine, that is 9 workers. Each worker is a separate OS process with its own memory space, so more workers means more memory usage. Monitor your server’s memory and adjust accordingly.
For production, use a configuration file instead of command-line arguments:
# gunicorn.conf.py import multiprocessing # Server socket bind = "0.0.0.0:8000" # Worker processes workers = multiprocessing.cpu_count() * 2 + 1 worker_class = "sync" worker_connections = 1000 timeout = 30 keepalive = 2 # Logging accesslog = "-" # stdout errorlog = "-" # stderr loglevel = "info" # Process naming proc_name = "myapp" # Server mechanics daemon = False pidfile = None umask = 0 tmp_upload_dir = None # Restart workers after this many requests (prevents memory leaks) max_requests = 1000 max_requests_jitter = 50 # Preload application code before forking workers preload_app = True
# Run with config file gunicorn -c gunicorn.conf.py "app:create_app()"
Gunicorn supports different worker types for different workloads:
async def views).# Threaded workers (4 threads per worker) gunicorn --workers 4 --threads 4 --bind 0.0.0.0:8000 "app:create_app()" # Gevent workers pip install gevent gunicorn --workers 4 --worker-class gevent --bind 0.0.0.0:8000 "app:create_app()" # Uvicorn workers (for async Flask) pip install uvicorn gunicorn --workers 4 --worker-class uvicorn.workers.UvicornWorker --bind 0.0.0.0:8000 "app:create_app()"
uWSGI is an alternative WSGI server with more features and more complexity. It supports the same pre-fork model but adds protocol-level optimizations, built-in caching, and its own process management.
# Install uWSGI pip install uwsgi # Run Flask app uwsgi --http 0.0.0.0:8000 --wsgi-file wsgi.py --callable app --processes 4 --threads 2
uWSGI is powerful but has a steeper learning curve. For most Flask deployments, Gunicorn is the simpler and more common choice. Choose uWSGI if you need its specific features (e.g., built-in caching, spooler for background tasks, or the uwsgi protocol for Nginx communication).
In production, you do not expose Gunicorn directly to the internet. Instead, you put Nginx in front of it as a reverse proxy. Nginx handles several responsibilities that Gunicorn should not:
# /etc/nginx/sites-available/myapp
server {
listen 80;
server_name myapp.example.com;
# Redirect all HTTP to HTTPS
return 301 https://$server_name$request_uri;
}
server {
listen 443 ssl http2;
server_name myapp.example.com;
# SSL certificates (managed by Certbot)
ssl_certificate /etc/letsencrypt/live/myapp.example.com/fullchain.pem;
ssl_certificate_key /etc/letsencrypt/live/myapp.example.com/privkey.pem;
ssl_protocols TLSv1.2 TLSv1.3;
ssl_ciphers HIGH:!aNULL:!MD5;
ssl_prefer_server_ciphers on;
# Security headers
add_header X-Frame-Options "SAMEORIGIN" always;
add_header X-Content-Type-Options "nosniff" always;
add_header X-XSS-Protection "1; mode=block" always;
add_header Strict-Transport-Security "max-age=31536000; includeSubDomains" always;
# Serve static files directly
location /static/ {
alias /var/www/myapp/app/static/;
expires 30d;
add_header Cache-Control "public, immutable";
}
# Proxy all other requests to Gunicorn
location / {
proxy_pass http://127.0.0.1:8000;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
proxy_redirect off;
# Timeouts
proxy_connect_timeout 60s;
proxy_read_timeout 60s;
proxy_send_timeout 60s;
}
# Client upload size limit
client_max_body_size 16M;
# Logging
access_log /var/log/nginx/myapp_access.log;
error_log /var/log/nginx/myapp_error.log;
}
# Enable the site sudo ln -s /etc/nginx/sites-available/myapp /etc/nginx/sites-enabled/ sudo nginx -t # Test configuration sudo systemctl reload nginx
When Nginx forwards requests to Gunicorn, Flask sees the request as coming from 127.0.0.1 instead of the actual client. The X-Forwarded-For and X-Forwarded-Proto headers carry the original client information. Tell Flask to trust these headers:
from werkzeug.middleware.proxy_fix import ProxyFix app = create_app() app.wsgi_app = ProxyFix(app.wsgi_app, x_for=1, x_proto=1, x_host=1, x_prefix=1)
Let’s Encrypt provides free TLS certificates. Certbot automates the entire process:
# Install Certbot sudo apt install certbot python3-certbot-nginx # Obtain and install certificate (auto-configures Nginx) sudo certbot --nginx -d myapp.example.com # Certbot sets up auto-renewal. Verify it: sudo certbot renew --dry-run
Certbot modifies your Nginx configuration to add SSL directives and sets up a systemd timer for automatic renewal before the certificate expires (every 90 days).
Docker packages your application, its dependencies, and its runtime environment into a single, portable image. This eliminates the “works on my machine” problem — if it runs in the Docker container locally, it runs the same way in production.
# Dockerfile
FROM python:3.12-slim
# Set environment variables
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1
# Create a non-root user
RUN groupadd -r appuser && useradd -r -g appuser -d /app -s /sbin/nologin appuser
# Set working directory
WORKDIR /app
# Install system dependencies
RUN apt-get update && \
apt-get install -y --no-install-recommends gcc libpq-dev && \
rm -rf /var/lib/apt/lists/*
# Install Python dependencies
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
# Copy application code
COPY . .
# Switch to non-root user
USER appuser
# Expose port
EXPOSE 8000
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD python -c "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')" || exit 1
# Run with Gunicorn
CMD ["gunicorn", "-c", "gunicorn.conf.py", "app:create_app()"]
Key decisions in this Dockerfile:
python:3.12-slim — The slim variant is much smaller than the full image (150 MB vs 1 GB) while still including essential system librariesPYTHONDONTWRITEBYTECODE=1 — Prevents Python from creating .pyc files in the containerPYTHONUNBUFFERED=1 — Ensures print statements and log messages appear immediately in Docker logsrequirements.txt changes, not on every code changeMulti-stage builds produce smaller production images by separating the build environment from the runtime environment:
# Dockerfile.multistage
# Stage 1: Build
FROM python:3.12-slim AS builder
WORKDIR /app
RUN apt-get update && \
apt-get install -y --no-install-recommends gcc libpq-dev && \
rm -rf /var/lib/apt/lists/*
COPY requirements.txt .
RUN pip install --no-cache-dir --prefix=/install -r requirements.txt
# Stage 2: Production
FROM python:3.12-slim
ENV PYTHONDONTWRITEBYTECODE=1 \
PYTHONUNBUFFERED=1
RUN groupadd -r appuser && useradd -r -g appuser -d /app -s /sbin/nologin appuser
# Install only runtime dependencies (no gcc, no build tools)
RUN apt-get update && \
apt-get install -y --no-install-recommends libpq5 && \
rm -rf /var/lib/apt/lists/*
# Copy installed packages from builder
COPY --from=builder /install /usr/local
WORKDIR /app
COPY . .
USER appuser
EXPOSE 8000
HEALTHCHECK --interval=30s --timeout=10s --start-period=5s --retries=3 \
CMD python -c "import urllib.request; urllib.request.urlopen('http://localhost:8000/health')" || exit 1
CMD ["gunicorn", "-c", "gunicorn.conf.py", "app:create_app()"]
The builder stage installs GCC and builds any C extensions (like psycopg2). The production stage only copies the compiled packages, leaving the build tools behind. This can reduce your image size by 200+ MB.
Always include a .dockerignore to keep unnecessary files out of the image:
# .dockerignore __pycache__ *.pyc *.pyo .git .gitignore .env .env.* *.md .pytest_cache .mypy_cache .coverage htmlcov/ venv/ .venv/ docker-compose*.yml Dockerfile* .dockerignore tests/ docs/ *.log
Docker Compose orchestrates multiple containers. A typical Flask production stack includes the application, a database, and a cache:
# docker-compose.yml
version: "3.9"
services:
web:
build: .
ports:
- "8000:8000"
environment:
- FLASK_ENV=production
- DATABASE_URL=postgresql://myapp:secretpassword@db:5432/myapp
- SECRET_KEY=${SECRET_KEY}
- REDIS_URL=redis://redis:6379/0
depends_on:
db:
condition: service_healthy
redis:
condition: service_healthy
restart: unless-stopped
volumes:
- app-static:/app/app/static
networks:
- backend
db:
image: postgres:16-alpine
environment:
- POSTGRES_DB=myapp
- POSTGRES_USER=myapp
- POSTGRES_PASSWORD=secretpassword
volumes:
- postgres-data:/var/lib/postgresql/data
healthcheck:
test: ["CMD-SHELL", "pg_isready -U myapp"]
interval: 10s
timeout: 5s
retries: 5
restart: unless-stopped
networks:
- backend
redis:
image: redis:7-alpine
command: redis-server --maxmemory 128mb --maxmemory-policy allkeys-lru
volumes:
- redis-data:/data
healthcheck:
test: ["CMD", "redis-cli", "ping"]
interval: 10s
timeout: 5s
retries: 5
restart: unless-stopped
networks:
- backend
nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx/nginx.conf:/etc/nginx/conf.d/default.conf:ro
- app-static:/var/www/static:ro
- ./certbot/conf:/etc/letsencrypt:ro
- ./certbot/www:/var/www/certbot:ro
depends_on:
- web
restart: unless-stopped
networks:
- backend
volumes:
postgres-data:
redis-data:
app-static:
networks:
backend:
driver: bridge
# Build and start all services docker compose up -d --build # View logs docker compose logs -f web # Run database migrations docker compose exec web flask db upgrade # Stop all services docker compose down # Stop and remove all data (careful!) docker compose down -v
Your Flask application needs a health check endpoint that Docker, load balancers, and monitoring tools can hit:
# app/routes/health.py
from flask import Blueprint, jsonify
from app.extensions import db
health_bp = Blueprint("health", __name__)
@health_bp.route("/health")
def health_check():
"""Basic health check - is the app running?"""
return jsonify({"status": "healthy"}), 200
@health_bp.route("/health/ready")
def readiness_check():
"""Readiness check - can the app handle requests?
Checks database connectivity and other dependencies.
"""
checks = {}
# Check database
try:
db.session.execute(db.text("SELECT 1"))
checks["database"] = "connected"
except Exception as e:
checks["database"] = f"error: {str(e)}"
return jsonify({"status": "unhealthy", "checks": checks}), 503
return jsonify({"status": "healthy", "checks": checks}), 200
With your application containerized, you have multiple options for where to run it. Each cloud platform offers different tradeoffs between control, simplicity, and cost.
AWS offers several services for Flask deployment, ranging from fully managed to bare metal:
The simplest AWS option. Elastic Beanstalk handles provisioning, load balancing, auto-scaling, and monitoring. You deploy your code, and AWS manages the infrastructure.
# Install EB CLI pip install awsebcli # Initialize Elastic Beanstalk in your project eb init -p python-3.12 myapp --region us-east-1 # Create an environment and deploy eb create production # Deploy updates eb deploy # Open in browser eb open
Elastic Beanstalk looks for an application.py file or a Procfile to know how to run your app:
# Procfile (for Elastic Beanstalk) web: gunicorn -c gunicorn.conf.py "app:create_app()"
For Docker-based deployments with more control. You push your Docker image to ECR (Elastic Container Registry) and define how ECS runs it. ECS handles scaling, networking, and load balancing. More configuration than Elastic Beanstalk, but more flexibility.
# Build and push to ECR aws ecr get-login-password --region us-east-1 | docker login --username AWS --password-stdin 123456789.dkr.ecr.us-east-1.amazonaws.com docker build -t myapp . docker tag myapp:latest 123456789.dkr.ecr.us-east-1.amazonaws.com/myapp:latest docker push 123456789.dkr.ecr.us-east-1.amazonaws.com/myapp:latest
Full control. You provision a virtual server, SSH in, install everything yourself, and manage updates. This is the most work but gives you complete control over the environment. Use this when you have specific requirements that managed services cannot accommodate.
Heroku is the fastest path from code to production. It is a Platform-as-a-Service (PaaS) that abstracts away all infrastructure concerns.
# Procfile (required by Heroku) web: gunicorn "app:create_app()"
# runtime.txt (specify Python version) python-3.12.8
# Deploy to Heroku heroku create myapp-production heroku addons:create heroku-postgresql:essential-0 heroku config:set SECRET_KEY=your-production-secret-key heroku config:set FLASK_ENV=production git push heroku main # Run migrations heroku run flask db upgrade # View logs heroku logs --tail
Heroku automatically detects Python applications, installs dependencies from requirements.txt, and runs the command specified in Procfile. It handles HTTPS, load balancing, and zero-downtime deploys.
DigitalOcean App Platform sits between Heroku’s simplicity and AWS’s flexibility. It supports both Dockerfile-based and buildpack-based deployments, connects directly to your GitHub repository, and auto-deploys on push.
# .do/app.yaml
name: myapp
services:
- name: web
github:
repo: yourusername/myapp
branch: main
build_command: pip install -r requirements.txt
run_command: gunicorn "app:create_app()"
environment_slug: python
instance_count: 2
instance_size_slug: professional-xs
envs:
- key: FLASK_ENV
value: production
- key: SECRET_KEY
type: SECRET
value: your-secret-key
- key: DATABASE_URL
scope: RUN_TIME
value: ${db.DATABASE_URL}
databases:
- name: db
engine: PG
version: "16"
| Factor | Heroku | AWS EB | AWS ECS | DigitalOcean |
|---|---|---|---|---|
| Setup complexity | Low | Medium | High | Low |
| Control | Limited | Medium | High | Medium |
| Cost (small app) | $5-25/mo | $15-50/mo | $20-60/mo | $5-25/mo |
| Auto-scaling | Yes | Yes | Yes | Yes |
| Docker support | Yes | Yes | Native | Yes |
| Free tier | No | Yes (12 months) | Yes (12 months) | No |
Your development SQLite database will not work in production. Production databases need concurrent access, connection pooling, automated backups, and replication. PostgreSQL is the standard choice for Flask applications.
Every database query requires a connection. Opening and closing connections for each request is expensive. Connection pooling maintains a pool of reusable connections.
SQLAlchemy (which Flask-SQLAlchemy wraps) includes a built-in connection pool. Configure it for production:
# config.py - ProductionConfig
class ProductionConfig(Config):
SQLALCHEMY_DATABASE_URI = os.environ["DATABASE_URL"]
# Connection pool settings
SQLALCHEMY_ENGINE_OPTIONS = {
"pool_size": 20, # Maximum number of persistent connections
"max_overflow": 10, # Extra connections allowed beyond pool_size
"pool_timeout": 30, # Seconds to wait for a connection from the pool
"pool_recycle": 1800, # Recycle connections after 30 minutes
"pool_pre_ping": True, # Test connections before using them
}
pool_pre_ping=True is especially important. It tests each connection before handing it to your application. If the connection has gone stale (e.g., the database restarted), SQLAlchemy transparently creates a new one instead of giving you a broken connection that causes an error on your user’s request.
Flask-Migrate (powered by Alembic) tracks database schema changes as versioned migration scripts. This is essential in production because you cannot drop and recreate tables — you have real data.
# Generate a migration after changing models flask db migrate -m "add user email column" # Review the generated migration in migrations/versions/ # Then apply it flask db upgrade # Rollback if something goes wrong flask db downgrade
Always review generated migrations before applying them. Alembic does its best to detect changes, but it can miss things (especially column renames, which it detects as a drop + create). Treat migrations as code that deserves code review.
Automate PostgreSQL backups with a cron job:
#!/bin/bash
# backup.sh
TIMESTAMP=$(date +%Y%m%d_%H%M%S)
BACKUP_DIR="/backups/postgres"
DB_NAME="myapp"
mkdir -p "$BACKUP_DIR"
pg_dump -U myapp -h localhost "$DB_NAME" | gzip > "$BACKUP_DIR/${DB_NAME}_${TIMESTAMP}.sql.gz"
# Keep only last 30 days of backups
find "$BACKUP_DIR" -name "*.sql.gz" -mtime +30 -delete
echo "Backup completed: ${DB_NAME}_${TIMESTAMP}.sql.gz"
# Add to crontab (daily at 2 AM) 0 2 * * * /opt/scripts/backup.sh >> /var/log/backup.log 2>&1
If you are using a managed database (AWS RDS, DigitalOcean Managed Databases), automated backups are built in. Configure the retention period and test your restore procedure regularly.
In production, print() statements are not logging. You need structured, configurable logging that writes to files or external services, includes severity levels, and gives you enough context to debug problems at 3 AM without SSH access to the server.
# app/logging_config.py
import logging
import logging.handlers
import os
def configure_logging(app):
"""Configure application logging for production."""
# Remove default Flask handler
app.logger.handlers.clear()
# Set log level from environment
log_level = os.environ.get("LOG_LEVEL", "INFO").upper()
app.logger.setLevel(getattr(logging, log_level))
# Console handler (for Docker/container logs)
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.DEBUG)
# Format: timestamp - logger name - level - message
formatter = logging.Formatter(
"[%(asctime)s] %(name)s %(levelname)s in %(module)s: %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)
console_handler.setFormatter(formatter)
app.logger.addHandler(console_handler)
# File handler with rotation (for VM deployments)
if os.environ.get("LOG_TO_FILE"):
file_handler = logging.handlers.RotatingFileHandler(
"logs/app.log",
maxBytes=10_000_000, # 10 MB
backupCount=10
)
file_handler.setLevel(logging.INFO)
file_handler.setFormatter(formatter)
app.logger.addHandler(file_handler)
# Suppress noisy loggers
logging.getLogger("werkzeug").setLevel(logging.WARNING)
logging.getLogger("sqlalchemy.engine").setLevel(logging.WARNING)
app.logger.info("Logging configured at %s level", log_level)
For production systems that feed logs into aggregation services (ELK stack, Datadog, CloudWatch), JSON-formatted logs are easier to parse and query:
# app/logging_config.py (JSON variant)
import json
import logging
from datetime import datetime, timezone
class JSONFormatter(logging.Formatter):
"""Format log records as JSON for log aggregation services."""
def format(self, record):
log_entry = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"level": record.levelname,
"logger": record.name,
"module": record.module,
"function": record.funcName,
"line": record.lineno,
"message": record.getMessage(),
}
if record.exc_info:
log_entry["exception"] = self.formatException(record.exc_info)
# Include extra fields if present
if hasattr(record, "request_id"):
log_entry["request_id"] = record.request_id
if hasattr(record, "user_id"):
log_entry["user_id"] = record.user_id
return json.dumps(log_entry)
# app/middleware.py
import time
import uuid
from flask import g, request, current_app
def register_request_hooks(app):
"""Register before/after request hooks for logging."""
@app.before_request
def before_request():
g.request_id = str(uuid.uuid4())[:8]
g.start_time = time.time()
@app.after_request
def after_request(response):
duration = time.time() - g.start_time
current_app.logger.info(
"request_completed",
extra={
"request_id": g.request_id,
"method": request.method,
"path": request.path,
"status": response.status_code,
"duration_ms": round(duration * 1000, 2),
"ip": request.remote_addr,
}
)
response.headers["X-Request-ID"] = g.request_id
return response
Sentry captures exceptions in real time, groups them, tracks their frequency, and provides full stack traces with local variable values. It is the industry standard for production error tracking.
pip install sentry-sdk[flask]
# app/__init__.py
import os
import sentry_sdk
from sentry_sdk.integrations.flask import FlaskIntegration
def create_app(config_name=None):
# Initialize Sentry before creating the app
if os.environ.get("SENTRY_DSN"):
sentry_sdk.init(
dsn=os.environ["SENTRY_DSN"],
integrations=[FlaskIntegration()],
traces_sample_rate=0.1, # 10% of requests for performance monitoring
environment=os.environ.get("FLASK_ENV", "production"),
)
app = Flask(__name__)
# ... rest of factory
The twelve-factor app methodology (12factor.net) establishes that configuration should be stored in the environment, not in code. This principle is fundamental to modern deployment.
In development, environment variables are managed with .env files. The python-dotenv package loads these into the environment automatically.
pip install python-dotenv
# .env (NEVER commit this file) FLASK_ENV=development SECRET_KEY=dev-secret-key-not-for-production DATABASE_URL=postgresql://localhost:5432/myapp_dev REDIS_URL=redis://localhost:6379/0 SENTRY_DSN= LOG_LEVEL=DEBUG
# wsgi.py (entry point) from dotenv import load_dotenv load_dotenv() # Load .env file before anything else from app import create_app app = create_app()
Critical rule: Never commit .env files to version control. Add them to .gitignore. Provide a .env.example with placeholder values so developers know which variables are needed.
# .env.example (commit this file) FLASK_ENV=development SECRET_KEY=change-me-to-a-random-string DATABASE_URL=postgresql://localhost:5432/myapp_dev REDIS_URL=redis://localhost:6379/0 SENTRY_DSN= LOG_LEVEL=DEBUG
The twelve factors most relevant to Flask deployment:
requirements.txt, virtual environments)flask db upgrade, management commands)Continuous Integration and Continuous Deployment automates testing and deployment. Every push to your repository triggers a pipeline that tests your code, builds a Docker image, and deploys it to production. No manual steps, no “I forgot to run the tests” moments.
# .github/workflows/deploy.yml
name: Test, Build, Deploy
on:
push:
branches: [main]
pull_request:
branches: [main]
env:
REGISTRY: ghcr.io
IMAGE_NAME: ${{ github.repository }}
jobs:
test:
runs-on: ubuntu-latest
services:
postgres:
image: postgres:16-alpine
env:
POSTGRES_DB: myapp_test
POSTGRES_USER: myapp
POSTGRES_PASSWORD: testpassword
ports:
- 5432:5432
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.12"
cache: "pip"
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
pip install pytest pytest-cov
- name: Run tests
env:
DATABASE_URL: postgresql://myapp:testpassword@localhost:5432/myapp_test
SECRET_KEY: test-secret-key
FLASK_ENV: testing
run: |
pytest --cov=app --cov-report=xml -v
- name: Upload coverage
uses: codecov/codecov-action@v4
with:
file: ./coverage.xml
build:
needs: test
runs-on: ubuntu-latest
if: github.event_name == 'push'
permissions:
contents: read
packages: write
steps:
- uses: actions/checkout@v4
- name: Log in to Container Registry
uses: docker/login-action@v3
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Build and push Docker image
uses: docker/build-push-action@v5
with:
context: .
push: true
tags: |
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:latest
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:${{ github.sha }}
deploy:
needs: build
runs-on: ubuntu-latest
if: github.event_name == 'push'
steps:
- name: Deploy to production server
uses: appleboy/ssh-action@v1
with:
host: ${{ secrets.DEPLOY_HOST }}
username: ${{ secrets.DEPLOY_USER }}
key: ${{ secrets.DEPLOY_SSH_KEY }}
script: |
cd /opt/myapp
docker compose pull web
docker compose up -d --no-deps web
docker compose exec -T web flask db upgrade
docker image prune -f
This pipeline has three stages:
main (not PRs). Builds the Docker image and pushes it to GitHub Container Registry.Let us put everything together into a complete, production-ready deployment. This is the full stack you would use for a real Flask application.
myapp/ ├── app/ │ ├── __init__.py # Application factory │ ├── extensions.py # SQLAlchemy, Migrate, etc. │ ├── models/ │ ├── routes/ │ │ ├── api.py │ │ └── health.py │ ├── static/ │ └── templates/ ├── migrations/ # Flask-Migrate / Alembic ├── tests/ ├── nginx/ │ └── nginx.conf ├── .env.example ├── .dockerignore ├── .github/ │ └── workflows/ │ └── deploy.yml ├── config.py ├── docker-compose.yml ├── docker-compose.prod.yml ├── Dockerfile ├── gunicorn.conf.py ├── requirements.txt └── wsgi.py
# wsgi.py
import os
from dotenv import load_dotenv
load_dotenv()
from app import create_app
app = create_app(os.environ.get("FLASK_ENV", "production"))
# docker-compose.prod.yml
version: "3.9"
services:
web:
build:
context: .
dockerfile: Dockerfile
expose:
- "8000"
environment:
- FLASK_ENV=production
- DATABASE_URL=postgresql://myapp:${DB_PASSWORD}@db:5432/myapp
- SECRET_KEY=${SECRET_KEY}
- REDIS_URL=redis://redis:6379/0
- SENTRY_DSN=${SENTRY_DSN}
- LOG_LEVEL=INFO
depends_on:
db:
condition: service_healthy
redis:
condition: service_healthy
restart: unless-stopped
networks:
- internal
db:
image: postgres:16-alpine
environment:
- POSTGRES_DB=myapp
- POSTGRES_USER=myapp
- POSTGRES_PASSWORD=${DB_PASSWORD}
volumes:
- postgres-data:/var/lib/postgresql/data
- ./backups:/backups
healthcheck:
test: ["CMD-SHELL", "pg_isready -U myapp"]
interval: 10s
timeout: 5s
retries: 5
restart: unless-stopped
networks:
- internal
redis:
image: redis:7-alpine
command: redis-server --maxmemory 256mb --maxmemory-policy allkeys-lru --requirepass ${REDIS_PASSWORD}
volumes:
- redis-data:/data
healthcheck:
test: ["CMD", "redis-cli", "-a", "${REDIS_PASSWORD}", "ping"]
interval: 10s
timeout: 5s
retries: 5
restart: unless-stopped
networks:
- internal
nginx:
image: nginx:alpine
ports:
- "80:80"
- "443:443"
volumes:
- ./nginx/nginx.conf:/etc/nginx/conf.d/default.conf:ro
- static-files:/var/www/static:ro
- ./certbot/conf:/etc/letsencrypt:ro
- ./certbot/www:/var/www/certbot:ro
depends_on:
- web
restart: unless-stopped
networks:
- internal
volumes:
postgres-data:
redis-data:
static-files:
networks:
internal:
driver: bridge
# nginx/nginx.conf
upstream flask_app {
server web:8000;
}
# Rate limiting zone
limit_req_zone $binary_remote_addr zone=api:10m rate=10r/s;
server {
listen 80;
server_name myapp.example.com;
# Allow Let's Encrypt challenge
location /.well-known/acme-challenge/ {
root /var/www/certbot;
}
# Redirect everything else to HTTPS
location / {
return 301 https://$server_name$request_uri;
}
}
server {
listen 443 ssl http2;
server_name myapp.example.com;
ssl_certificate /etc/letsencrypt/live/myapp.example.com/fullchain.pem;
ssl_certificate_key /etc/letsencrypt/live/myapp.example.com/privkey.pem;
ssl_protocols TLSv1.2 TLSv1.3;
ssl_ciphers HIGH:!aNULL:!MD5;
ssl_prefer_server_ciphers on;
ssl_session_cache shared:SSL:10m;
ssl_session_timeout 10m;
# Security headers
add_header X-Frame-Options "SAMEORIGIN" always;
add_header X-Content-Type-Options "nosniff" always;
add_header Referrer-Policy "strict-origin-when-cross-origin" always;
add_header Content-Security-Policy "default-src 'self'" always;
add_header Strict-Transport-Security "max-age=31536000; includeSubDomains; preload" always;
# Gzip compression
gzip on;
gzip_types text/plain text/css application/json application/javascript text/xml;
gzip_min_length 1000;
# Static files served directly by Nginx
location /static/ {
alias /var/www/static/;
expires 30d;
add_header Cache-Control "public, immutable";
access_log off;
}
# API routes with rate limiting
location /api/ {
limit_req zone=api burst=20 nodelay;
proxy_pass http://flask_app;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
# All other routes
location / {
proxy_pass http://flask_app;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
client_max_body_size 16M;
}
# gunicorn.conf.py
import multiprocessing
import os
# Server socket
bind = "0.0.0.0:8000"
# Workers
workers = int(os.environ.get("GUNICORN_WORKERS", multiprocessing.cpu_count() * 2 + 1))
worker_class = os.environ.get("GUNICORN_WORKER_CLASS", "sync")
worker_connections = 1000
timeout = 120
keepalive = 5
# Logging
accesslog = "-"
errorlog = "-"
loglevel = os.environ.get("LOG_LEVEL", "info").lower()
access_log_format = '%(h)s %(l)s %(u)s %(t)s "%(r)s" %(s)s %(b)s "%(f)s" "%(a)s" %(D)s'
# Process management
max_requests = 1000
max_requests_jitter = 50
preload_app = True
graceful_timeout = 30
# Hook: log when workers start and stop
def on_starting(server):
server.log.info("Gunicorn master starting")
def post_fork(server, worker):
server.log.info("Worker spawned (pid: %s)", worker.pid)
def worker_exit(server, worker):
server.log.info("Worker exited (pid: %s)", worker.pid)
# scripts/check_production.py
"""Pre-deployment production readiness checker."""
import os
import sys
def check_production_readiness():
checks = []
errors = []
# 1. Check required environment variables
required_vars = ["SECRET_KEY", "DATABASE_URL", "FLASK_ENV"]
for var in required_vars:
if os.environ.get(var):
checks.append(f"[PASS] {var} is set")
else:
errors.append(f"[FAIL] {var} is not set")
# 2. Check debug mode
flask_env = os.environ.get("FLASK_ENV", "")
if flask_env == "production":
checks.append("[PASS] FLASK_ENV is 'production'")
else:
errors.append(f"[FAIL] FLASK_ENV is '{flask_env}', expected 'production'")
# 3. Check SECRET_KEY is not a default
secret = os.environ.get("SECRET_KEY", "")
weak_secrets = ["dev", "secret", "change-me", "default", "password"]
if any(weak in secret.lower() for weak in weak_secrets):
errors.append("[FAIL] SECRET_KEY appears to be a default/weak value")
elif len(secret) < 32:
errors.append(f"[FAIL] SECRET_KEY is too short ({len(secret)} chars, need 32+)")
else:
checks.append("[PASS] SECRET_KEY looks strong")
# 4. Check database URL is not SQLite
db_url = os.environ.get("DATABASE_URL", "")
if "sqlite" in db_url:
errors.append("[FAIL] DATABASE_URL uses SQLite (not suitable for production)")
else:
checks.append("[PASS] DATABASE_URL is not SQLite")
# Print results
print("\n=== Production Readiness Check ===\n")
for check in checks:
print(f" {check}")
for error in errors:
print(f" {error}")
print(f"\n Passed: {len(checks)}, Failed: {len(errors)}\n")
if errors:
print(" RESULT: NOT READY FOR PRODUCTION\n")
sys.exit(1)
else:
print(" RESULT: READY FOR PRODUCTION\n")
sys.exit(0)
if __name__ == "__main__":
check_production_readiness()
Scaling is the art of handling more traffic without degrading performance. There are two approaches, and you will eventually use both.
Vertical scaling means giving your server more resources — more CPU, more RAM, faster disks. It is the simplest approach: upgrade your VM from 2 cores to 8 cores, and Gunicorn spawns more workers. But vertical scaling has a ceiling. A single machine can only get so big, and it is still a single point of failure.
Horizontal scaling means running multiple instances of your application behind a load balancer. This is the standard approach for production systems.
┌─────────────┐
│ Internet │
└──────┬──────┘
│
┌──────▼──────┐
│ Load Balancer│
│ (Nginx) │
└──┬───┬───┬──┘
│ │ │
┌────────▼┐ ┌▼────────┐ ┌▼────────┐
│ Flask #1 │ │ Flask #2 │ │ Flask #3 │
│ Gunicorn │ │ Gunicorn │ │ Gunicorn │
└────┬─────┘ └───┬─────┘ └───┬─────┘
│ │ │
┌────▼───────────▼────────────▼────┐
│ PostgreSQL + Redis │
└───────────────────────────────────┘
Horizontal scaling requires your application to be stateless. That means:
# app/cache.py
import redis
import json
import os
from functools import wraps
from flask import current_app
redis_client = redis.from_url(os.environ.get("REDIS_URL", "redis://localhost:6379/0"))
def cache_response(timeout=300, key_prefix="view"):
"""Decorator to cache Flask view responses in Redis."""
def decorator(f):
@wraps(f)
def wrapper(*args, **kwargs):
cache_key = f"{key_prefix}:{f.__name__}:{hash(str(args) + str(kwargs))}"
# Try to get from cache
cached = redis_client.get(cache_key)
if cached:
current_app.logger.debug("Cache hit: %s", cache_key)
return json.loads(cached)
# Execute function and cache result
result = f(*args, **kwargs)
redis_client.setex(cache_key, timeout, json.dumps(result))
current_app.logger.debug("Cache miss, stored: %s", cache_key)
return result
return wrapper
return decorator
# Usage in a route
@api_bp.route("/products")
@cache_response(timeout=60)
def get_products():
products = Product.query.all()
return [p.to_dict() for p in products]
# Store sessions in Redis instead of signed cookies pip install Flask-Session
# config.py
import redis
class ProductionConfig(Config):
SESSION_TYPE = "redis"
SESSION_REDIS = redis.from_url(os.environ["REDIS_URL"])
SESSION_PERMANENT = False
SESSION_USE_SIGNER = True
A Content Delivery Network serves your static files from edge servers around the world, reducing latency for users far from your origin server. Popular options include CloudFront (AWS), Cloudflare, and Fastly.
# config.py
class ProductionConfig(Config):
CDN_DOMAIN = os.environ.get("CDN_DOMAIN", "")
# In templates, use the CDN domain for static assets
# app/__init__.py
@app.context_processor
def inject_cdn():
return {"cdn_domain": app.config.get("CDN_DOMAIN", "")}
<!-- In Jinja2 templates -->
{% if cdn_domain %}
<link rel="stylesheet" href="https://{{ cdn_domain }}/static/css/style.css">
{% else %}
<link rel="stylesheet" href="{{ url_for('static', filename='css/style.css') }}">
{% endif %}
These are the mistakes I see most often in Flask deployments. Every one of them has caused production outages.
Running with debug=True exposes the Werkzeug interactive debugger. Anyone who can trigger an exception can execute arbitrary Python code on your server. This is not a theoretical risk — it is a trivially exploitable remote code execution vulnerability.
# NEVER in production app.run(debug=True) # Always check assert not app.debug, "Debug mode must be off in production"
# BAD: Secret in source code, visible in Git history forever app.config["SECRET_KEY"] = "my-super-secret-key-2024" # GOOD: Secret from environment app.config["SECRET_KEY"] = os.environ["SECRET_KEY"]
Even if you delete the hardcoded secret in a later commit, it remains in your Git history. Anyone with repository access can find it. If this has already happened, rotate the secret immediately.
Without health checks, your load balancer and container orchestrator have no way to know if your application is actually working. A process can be running but unable to handle requests (e.g., database connection lost). Health checks let the infrastructure detect and replace unhealthy instances automatically.
All traffic must be encrypted. No exceptions. Credentials, session tokens, and user data are all visible in plain HTTP. Let’s Encrypt makes this free. There is no excuse.
SQLite does not support concurrent writes. When two Gunicorn workers try to write simultaneously, one gets a “database is locked” error. Use PostgreSQL or MySQL.
Without connection pooling, every request opens a new database connection and closes it when done. Under load, you exhaust the database’s connection limit. SQLAlchemy’s pool is configured by default, but you should tune pool_size and max_overflow for your workload.
If you log to a file without rotation, the file grows until it fills the disk. Use RotatingFileHandler or, better yet, log to stdout and let Docker/systemd handle it.
When deploying a new version, the old process must finish handling in-flight requests before shutting down. Gunicorn handles this correctly with SIGTERM by default, but make sure your deployment process sends the right signal and waits for the graceful timeout.
Follow the twelve-factor methodology. It was written by engineers at Heroku who deployed millions of applications. The principles are battle-tested and apply to every Flask deployment.
Every aspect of your infrastructure should be defined in version-controlled files:
Dockerfile — Application containerdocker-compose.yml — Service orchestrationnginx.conf — Reverse proxy configurationgunicorn.conf.py — WSGI server configuration.github/workflows/deploy.yml — CI/CD pipelineIf your production server dies, you should be able to recreate the entire environment from these files. No manual server configuration. No tribal knowledge. Everything is documented in code.
Users should never see an error page during a deployment. Strategies:
pip-audit for vulnerability scanning)docker scout, trivy)# Scan for known vulnerabilities in your dependencies pip install pip-audit pip-audit # Scan Docker image docker scout cves myapp:latest
Deployment is not a one-time event. It is an ongoing practice. Your deployment infrastructure evolves with your application. Start with the basics — Gunicorn, Nginx, Docker, CI/CD — and add sophistication as your needs grow. The patterns in this tutorial will serve you from your first production deployment to your thousandth.
Code that is not tested is broken code. You might not know it yet, but it is broken. It works today because you just wrote it and manually clicked through every page. It will break tomorrow when your colleague changes a utility function, when a dependency updates, or when a customer submits a form field you never considered. Testing is the only reliable way to prove that your application does what you think it does, and the only way to change code with confidence.
Flask applications are particularly well-suited for testing. Flask was designed with testability in mind — the framework provides a built-in test client, application context management, and hooks for overriding configuration. Unlike monolithic frameworks where testing often feels like fighting the framework, Flask testing is straightforward and fast.
This tutorial covers every aspect of testing Flask applications, from your first unit test to running a full test suite in CI/CD. We will cover:
calculate_discount() function return the correct value? Does your User model validate email addresses properly?/register endpoint actually create a user in the database and send a welcome email?The testing pyramid suggests that you should have many unit tests, fewer integration tests, and even fewer end-to-end tests. Unit tests are fast and pinpoint failures precisely. Integration tests catch issues at boundaries between components. End-to-end tests verify that the whole system works, but they are slower and harder to debug when they fail.
We will use pytest as our testing framework. While Python ships with unittest, pytest is the industry standard for Python testing. It has cleaner syntax, powerful fixtures, better error messages, and a massive plugin ecosystem. Every serious Flask project uses pytest.
Start by installing pytest and the essential testing libraries:
pip install pytest pytest-cov pytest-flask flask
Here is what each package does:
A well-organized test structure mirrors your application structure. Here is the layout we will use throughout this tutorial:
flask_app/ ├── app/ │ ├── __init__.py # Application factory │ ├── models.py # SQLAlchemy models │ ├── routes/ │ │ ├── __init__.py │ │ ├── auth.py # Authentication routes │ │ └── api.py # API routes │ ├── services/ │ │ ├── __init__.py │ │ └── email_service.py # External service │ └── templates/ │ ├── base.html │ ├── login.html │ └── dashboard.html ├── tests/ │ ├── __init__.py # Makes tests a package │ ├── conftest.py # Shared fixtures │ ├── test_auth.py # Authentication tests │ ├── test_api.py # API endpoint tests │ ├── test_models.py # Model tests │ └── test_services.py # Service layer tests ├── config.py # Configuration classes ├── pytest.ini # pytest configuration └── requirements.txt
Key conventions:
test_ (pytest discovers them automatically)test_tests/ directory has its own __init__.pyconftest.py holds fixtures shared across all test files (pytest picks it up automatically)Create a pytest.ini file at the project root to configure pytest behavior:
[pytest]
testpaths = tests
python_files = test_*.py
python_functions = test_*
python_classes = Test*
addopts = -v --tb=short
filterwarnings =
ignore::DeprecationWarning
You can also use pyproject.toml if you prefer:
[tool.pytest.ini_options] testpaths = ["tests"] addopts = "-v --tb=short" python_files = ["test_*.py"] python_functions = ["test_*"]
The application factory pattern is essential for testing. It lets you create fresh application instances with different configurations for each test run. Here is a minimal application factory:
# config.py
import os
class Config:
"""Base configuration."""
SECRET_KEY = os.environ.get("SECRET_KEY", "dev-secret-key")
SQLALCHEMY_TRACK_MODIFICATIONS = False
class DevelopmentConfig(Config):
"""Development configuration."""
DEBUG = True
SQLALCHEMY_DATABASE_URI = "sqlite:///dev.db"
class TestingConfig(Config):
"""Testing configuration."""
TESTING = True
SQLALCHEMY_DATABASE_URI = "sqlite:///:memory:"
WTF_CSRF_ENABLED = False # Disable CSRF for testing
SERVER_NAME = "localhost"
class ProductionConfig(Config):
"""Production configuration."""
SQLALCHEMY_DATABASE_URI = os.environ.get("DATABASE_URL")
# app/__init__.py
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
def create_app(config_class="config.DevelopmentConfig"):
"""Application factory.
Args:
config_class: Configuration class path string.
Returns:
Configured Flask application instance.
"""
app = Flask(__name__)
app.config.from_object(config_class)
# Initialize extensions
db.init_app(app)
# Register blueprints
from app.routes.auth import auth_bp
from app.routes.api import api_bp
app.register_blueprint(auth_bp)
app.register_blueprint(api_bp, url_prefix="/api")
# Create database tables
with app.app_context():
db.create_all()
return app
The critical detail here is the TestingConfig class. It uses an in-memory SQLite database (sqlite:///:memory:) that is created fresh for every test session, it disables CSRF protection so form submissions work without tokens, and it sets TESTING = True which disables error catching during request handling so you get real exceptions in tests instead of error pages.
Flask provides a test client that simulates HTTP requests to your application without running a real server. No network calls, no ports, no server process. The test client sends requests directly through Flask’s request handling pipeline, making tests extremely fast.
from app import create_app
def test_homepage():
"""Test that the homepage returns 200."""
app = create_app("config.TestingConfig")
with app.test_client() as client:
response = client.get("/")
assert response.status_code == 200
The test_client() method returns a FlaskClient instance that you use to make requests. Using it as a context manager (with with) ensures proper cleanup of the application context.
The test client supports all HTTP methods:
def test_http_methods(client):
"""Demonstrate all HTTP methods with the test client."""
# GET request
response = client.get("/api/users")
assert response.status_code == 200
# GET with query parameters
response = client.get("/api/users?page=2&per_page=10")
assert response.status_code == 200
# POST with JSON body
response = client.post(
"/api/users",
json={"name": "Alice", "email": "alice@example.com"},
content_type="application/json"
)
assert response.status_code == 201
# POST with form data
response = client.post(
"/login",
data={"username": "alice", "password": "secret123"}
)
assert response.status_code == 200
# PUT request
response = client.put(
"/api/users/1",
json={"name": "Alice Updated"}
)
assert response.status_code == 200
# PATCH request
response = client.patch(
"/api/users/1",
json={"email": "newalice@example.com"}
)
assert response.status_code == 200
# DELETE request
response = client.delete("/api/users/1")
assert response.status_code == 204
The response object gives you everything you need to verify behavior:
def test_response_inspection(client):
"""Demonstrate response inspection methods."""
response = client.get("/api/users/1")
# Status code
assert response.status_code == 200
# Response body as bytes
raw_data = response.data
# Response body as string
text = response.get_data(as_text=True)
# Parse JSON response
json_data = response.get_json()
assert json_data["name"] == "Alice"
# Response headers
assert response.content_type == "application/json"
assert "Content-Length" in response.headers
# Check for redirects
response = client.post("/login", data={"username": "alice", "password": "secret"})
assert response.status_code == 302
assert response.location == "/dashboard"
# Follow redirects automatically
response = client.post(
"/login",
data={"username": "alice", "password": "secret"},
follow_redirects=True
)
assert response.status_code == 200
assert b"Dashboard" in response.data
def test_headers_and_cookies(client):
"""Demonstrate setting headers and cookies."""
# Custom headers
response = client.get(
"/api/users",
headers={
"Authorization": "Bearer eyJhbGciOiJIUzI1NiIsInR...",
"Accept": "application/json",
"X-Request-ID": "test-123"
}
)
# Set cookies on the client
client.set_cookie("session_id", "abc123", domain="localhost")
response = client.get("/dashboard")
# Read cookies from response
cookies = response.headers.getlist("Set-Cookie")
# Delete cookies
client.delete_cookie("session_id", domain="localhost")
Fixtures are the backbone of well-organized tests. They provide reusable setup and teardown logic, eliminate duplication, and make tests readable. pytest fixtures are functions decorated with @pytest.fixture that return a value your test functions can use.
Place shared fixtures in conftest.py. pytest automatically discovers this file and makes its fixtures available to all tests in the same directory and subdirectories. You never need to import it.
# tests/conftest.py
import pytest
from app import create_app, db as _db
from app.models import User
@pytest.fixture(scope="session")
def app():
"""Create application for the entire test session.
scope='session' means this fixture runs once and is shared
across all tests. This is efficient because creating the app
is expensive (loading config, registering blueprints, etc.)
but the app object itself is stateless.
"""
app = create_app("config.TestingConfig")
yield app
@pytest.fixture(scope="function")
def client(app):
"""Create a test client for each test function.
scope='function' (the default) means each test gets a fresh
client. This prevents state leakage between tests.
"""
with app.test_client() as client:
yield client
@pytest.fixture(scope="function")
def db(app):
"""Set up a clean database for each test.
Creates all tables before the test and drops them after.
This guarantees each test starts with an empty database.
"""
with app.app_context():
_db.create_all()
yield _db
_db.session.rollback()
_db.drop_all()
@pytest.fixture
def sample_user(db):
"""Create a sample user for tests that need one."""
user = User(
username="testuser",
email="test@example.com"
)
user.set_password("password123")
db.session.add(user)
db.session.commit()
return user
@pytest.fixture
def auth_client(client, sample_user):
"""Create an authenticated test client.
Logs in the sample user so tests that need authentication
do not have to repeat the login step.
"""
client.post("/login", data={
"username": "testuser",
"password": "password123"
})
return client
@pytest.fixture
def api_headers():
"""Return standard API headers."""
return {
"Content-Type": "application/json",
"Accept": "application/json"
}
@pytest.fixture
def auth_headers(sample_user):
"""Return headers with a valid JWT token."""
from app.services.auth_service import generate_token
token = generate_token(sample_user.id)
return {
"Content-Type": "application/json",
"Authorization": f"Bearer {token}"
}
Understanding fixture scopes is critical for writing efficient tests:
# Fixture scope examples
@pytest.fixture(scope="session")
def app():
"""Session-scoped: created once, shared by all tests."""
return create_app("config.TestingConfig")
@pytest.fixture(scope="module")
def expensive_resource():
"""Module-scoped: created once per test file."""
resource = create_expensive_resource()
yield resource
resource.cleanup()
@pytest.fixture(scope="function")
def db_session(app):
"""Function-scoped: fresh for every single test."""
with app.app_context():
_db.create_all()
yield _db.session
_db.session.rollback()
_db.drop_all()
Fixtures can depend on other fixtures. pytest resolves the dependency graph automatically:
@pytest.fixture
def user(db):
"""Depends on db fixture — db is set up first."""
user = User(username="alice", email="alice@example.com")
user.set_password("secret")
db.session.add(user)
db.session.commit()
return user
@pytest.fixture
def user_with_posts(user, db):
"""Depends on user fixture — user is created first."""
from app.models import Post
for i in range(3):
post = Post(
title=f"Post {i}",
content=f"Content for post {i}",
author_id=user.id
)
db.session.add(post)
db.session.commit()
return user
@pytest.fixture
def admin_user(db):
"""Create an admin user."""
admin = User(
username="admin",
email="admin@example.com",
is_admin=True
)
admin.set_password("admin123")
db.session.add(admin)
db.session.commit()
return admin
Route testing verifies that your endpoints accept the right inputs, return the right outputs, and handle edge cases correctly. This is the bread and butter of Flask testing.
def test_homepage_returns_200(client):
"""Test that the homepage is accessible."""
response = client.get("/")
assert response.status_code == 200
def test_homepage_contains_welcome_message(client):
"""Test that the homepage renders expected content."""
response = client.get("/")
assert b"Welcome" in response.data
def test_user_profile_requires_login(client):
"""Test that profile page redirects unauthenticated users."""
response = client.get("/profile")
assert response.status_code == 302
assert "/login" in response.location
def test_user_profile_shows_username(auth_client, sample_user):
"""Test that authenticated users see their profile."""
response = auth_client.get("/profile")
assert response.status_code == 200
assert sample_user.username.encode() in response.data
def test_nonexistent_page_returns_404(client):
"""Test that missing pages return 404."""
response = client.get("/this-page-does-not-exist")
assert response.status_code == 404
def test_register_with_valid_data(client, db):
"""Test successful user registration."""
response = client.post("/register", data={
"username": "newuser",
"email": "new@example.com",
"password": "StrongPass123!",
"confirm_password": "StrongPass123!"
}, follow_redirects=True)
assert response.status_code == 200
assert b"Registration successful" in response.data
# Verify user was actually created in database
user = User.query.filter_by(username="newuser").first()
assert user is not None
assert user.email == "new@example.com"
def test_register_with_duplicate_email(client, db, sample_user):
"""Test that duplicate email addresses are rejected."""
response = client.post("/register", data={
"username": "another_user",
"email": "test@example.com", # Already used by sample_user
"password": "StrongPass123!",
"confirm_password": "StrongPass123!"
})
assert response.status_code == 400
assert b"Email already registered" in response.data
def test_register_with_missing_fields(client, db):
"""Test that missing fields return validation errors."""
response = client.post("/register", data={
"username": "newuser"
# Missing email and password
})
assert response.status_code == 400
assert b"email" in response.data.lower() or b"required" in response.data.lower()
def test_create_post_via_api(auth_headers, client, db):
"""Test creating a resource via JSON API."""
response = client.post(
"/api/posts",
json={
"title": "Test Post",
"content": "This is a test post.",
"tags": ["python", "flask"]
},
headers=auth_headers
)
assert response.status_code == 201
data = response.get_json()
assert data["title"] == "Test Post"
assert data["id"] is not None
assert "created_at" in data
def test_create_post_with_invalid_json(auth_headers, client):
"""Test that invalid JSON returns 400."""
response = client.post(
"/api/posts",
data="this is not json",
content_type="application/json",
headers=auth_headers
)
assert response.status_code == 400
def test_create_post_missing_required_field(auth_headers, client, db):
"""Test that missing required fields return validation errors."""
response = client.post(
"/api/posts",
json={"content": "No title provided"},
headers=auth_headers
)
assert response.status_code == 400
data = response.get_json()
assert "title" in str(data).lower()
import io
def test_file_upload(auth_client, db):
"""Test uploading a file."""
# Create a fake file in memory
data = {
"file": (io.BytesIO(b"file content here"), "test.txt"),
"description": "A test file"
}
response = auth_client.post(
"/upload",
data=data,
content_type="multipart/form-data"
)
assert response.status_code == 200
assert b"File uploaded successfully" in response.data
def test_upload_image(auth_client, db):
"""Test uploading an image file."""
# Create a minimal valid PNG (1x1 pixel)
png_data = (
b"\x89PNG\r\n\x1a\n\x00\x00\x00\rIHDR"
b"\x00\x00\x00\x01\x00\x00\x00\x01\x08\x02"
b"\x00\x00\x00\x90wS\xde\x00\x00\x00\x0c"
b"IDATx\x9cc\xf8\x0f\x00\x00\x01\x01\x00"
b"\x05\x18\xd8N\x00\x00\x00\x00IEND\xaeB`\x82"
)
data = {
"image": (io.BytesIO(png_data), "photo.png"),
}
response = auth_client.post(
"/upload/image",
data=data,
content_type="multipart/form-data"
)
assert response.status_code == 200
def test_upload_rejects_invalid_extension(auth_client):
"""Test that dangerous file extensions are rejected."""
data = {
"file": (io.BytesIO(b"malicious content"), "hack.exe"),
}
response = auth_client.post(
"/upload",
data=data,
content_type="multipart/form-data"
)
assert response.status_code == 400
assert b"File type not allowed" in response.data
Database testing is where most Flask developers struggle. The challenge is isolation — each test must start with a known database state and must not affect other tests. There are two main strategies: drop and recreate tables, or use transaction rollbacks.
@pytest.fixture
def db(app):
"""Drop all tables, recreate, yield, then drop again."""
with app.app_context():
_db.drop_all()
_db.create_all()
yield _db
_db.session.remove()
_db.drop_all()
This is the simplest approach. It guarantees a clean slate but is slower because it rebuilds the schema for every test.
@pytest.fixture(scope="session")
def db(app):
"""Create tables once for the entire test session."""
with app.app_context():
_db.create_all()
yield _db
_db.drop_all()
@pytest.fixture(scope="function", autouse=True)
def db_session(db, app):
"""Wrap each test in a transaction that rolls back.
autouse=True means this fixture runs for every test
automatically, without being explicitly requested.
"""
with app.app_context():
connection = db.engine.connect()
transaction = connection.begin()
# Bind the session to this connection
options = dict(bind=connection, binds={})
session = db.create_scoped_session(options=options)
db.session = session
yield session
# Rollback the transaction, undoing all changes
transaction.rollback()
connection.close()
session.remove()
The transaction rollback strategy is significantly faster for large test suites. Instead of dropping and recreating tables for every test, it wraps each test in a database transaction and rolls it back when the test finishes. The data changes simply vanish.
# tests/test_models.py
from app.models import User, Post
class TestUserModel:
"""Tests for the User model."""
def test_create_user(self, db):
"""Test creating a user."""
user = User(username="alice", email="alice@example.com")
user.set_password("secret123")
db.session.add(user)
db.session.commit()
assert user.id is not None
assert user.username == "alice"
assert user.email == "alice@example.com"
def test_password_hashing(self, db):
"""Test that passwords are hashed, not stored in plain text."""
user = User(username="bob", email="bob@example.com")
user.set_password("mypassword")
assert user.password_hash != "mypassword"
assert user.check_password("mypassword") is True
assert user.check_password("wrongpassword") is False
def test_unique_username(self, db):
"""Test that duplicate usernames are rejected."""
user1 = User(username="alice", email="alice1@example.com")
user1.set_password("pass1")
db.session.add(user1)
db.session.commit()
user2 = User(username="alice", email="alice2@example.com")
user2.set_password("pass2")
db.session.add(user2)
with pytest.raises(Exception): # IntegrityError
db.session.commit()
def test_user_repr(self, db):
"""Test the string representation of User."""
user = User(username="alice", email="alice@example.com")
assert "alice" in repr(user)
class TestPostModel:
"""Tests for the Post model."""
def test_create_post(self, db, sample_user):
"""Test creating a post with an author."""
post = Post(
title="My First Post",
content="Hello, World!",
author_id=sample_user.id
)
db.session.add(post)
db.session.commit()
assert post.id is not None
assert post.author_id == sample_user.id
assert post.created_at is not None
def test_post_author_relationship(self, db, sample_user):
"""Test the relationship between Post and User."""
post = Post(
title="Test",
content="Content",
author_id=sample_user.id
)
db.session.add(post)
db.session.commit()
assert post.author.username == "testuser"
assert post in sample_user.posts
@pytest.fixture
def seed_data(db):
"""Seed the database with realistic test data."""
users = []
for i in range(5):
user = User(
username=f"user_{i}",
email=f"user_{i}@example.com"
)
user.set_password(f"password_{i}")
db.session.add(user)
users.append(user)
db.session.commit()
posts = []
for i, user in enumerate(users):
for j in range(3):
post = Post(
title=f"Post {i}-{j}",
content=f"Content by {user.username}",
author_id=user.id
)
db.session.add(post)
posts.append(post)
db.session.commit()
return {"users": users, "posts": posts}
def test_list_posts_pagination(client, seed_data):
"""Test that posts are paginated correctly."""
response = client.get("/api/posts?page=1&per_page=5")
data = response.get_json()
assert len(data["items"]) == 5
assert data["total"] == 15 # 5 users * 3 posts each
assert data["page"] == 1
assert data["pages"] == 3
Authentication tests verify that your security layer works correctly. This means testing both the happy path (valid credentials grant access) and the security boundaries (invalid credentials are rejected, protected routes are actually protected).
class TestLogin:
"""Tests for session-based login/logout."""
def test_login_page_renders(self, client):
"""Test that the login page is accessible."""
response = client.get("/login")
assert response.status_code == 200
assert b"Login" in response.data
def test_login_with_valid_credentials(self, client, sample_user):
"""Test successful login."""
response = client.post("/login", data={
"username": "testuser",
"password": "password123"
}, follow_redirects=True)
assert response.status_code == 200
assert b"Dashboard" in response.data or b"Welcome" in response.data
def test_login_with_wrong_password(self, client, sample_user):
"""Test login with incorrect password."""
response = client.post("/login", data={
"username": "testuser",
"password": "wrongpassword"
})
assert response.status_code == 401 or b"Invalid" in response.data
def test_login_with_nonexistent_user(self, client, db):
"""Test login with a username that does not exist."""
response = client.post("/login", data={
"username": "ghost",
"password": "password123"
})
assert response.status_code == 401 or b"Invalid" in response.data
def test_logout(self, auth_client):
"""Test that logout clears the session."""
response = auth_client.get("/logout", follow_redirects=True)
assert response.status_code == 200
# Verify session is cleared by accessing protected route
response = auth_client.get("/dashboard")
assert response.status_code == 302 # Redirected to login
def test_protected_route_without_login(self, client):
"""Test that protected routes redirect to login."""
response = client.get("/dashboard")
assert response.status_code == 302
assert "/login" in response.location
class TestJWTAuth:
"""Tests for JWT-based API authentication."""
def test_get_token_with_valid_credentials(self, client, sample_user):
"""Test obtaining a JWT token."""
response = client.post("/api/auth/login", json={
"username": "testuser",
"password": "password123"
})
assert response.status_code == 200
data = response.get_json()
assert "access_token" in data
assert "refresh_token" in data
def test_get_token_with_invalid_credentials(self, client, sample_user):
"""Test that invalid credentials do not return a token."""
response = client.post("/api/auth/login", json={
"username": "testuser",
"password": "wrong"
})
assert response.status_code == 401
data = response.get_json()
assert "access_token" not in data
def test_access_protected_endpoint_with_token(self, client, auth_headers):
"""Test accessing a protected API endpoint."""
response = client.get("/api/users/me", headers=auth_headers)
assert response.status_code == 200
data = response.get_json()
assert data["username"] == "testuser"
def test_access_protected_endpoint_without_token(self, client):
"""Test that missing token returns 401."""
response = client.get("/api/users/me")
assert response.status_code == 401
data = response.get_json()
assert "msg" in data or "message" in data
def test_access_with_expired_token(self, client, sample_user):
"""Test that expired tokens are rejected."""
from app.services.auth_service import generate_token
# Generate a token that expires immediately
token = generate_token(sample_user.id, expires_in=-1)
headers = {"Authorization": f"Bearer {token}"}
response = client.get("/api/users/me", headers=headers)
assert response.status_code == 401
def test_refresh_token(self, client, sample_user):
"""Test refreshing an expired access token."""
# First, get tokens
login_response = client.post("/api/auth/login", json={
"username": "testuser",
"password": "password123"
})
refresh_token = login_response.get_json()["refresh_token"]
# Use refresh token to get new access token
response = client.post("/api/auth/refresh", json={
"refresh_token": refresh_token
})
assert response.status_code == 200
data = response.get_json()
assert "access_token" in data
def test_access_admin_endpoint_as_regular_user(self, client, auth_headers):
"""Test that non-admin users cannot access admin endpoints."""
response = client.get("/api/admin/users", headers=auth_headers)
assert response.status_code == 403
Mocking replaces real objects with controlled substitutes. You mock things that are external to the code you are testing: API calls, email sending, file system operations, time-dependent functions, and third-party services. You do not mock the code you are testing — that defeats the purpose.
from unittest.mock import patch, MagicMock
def test_send_welcome_email(client, db):
"""Test registration sends a welcome email without actually sending one."""
with patch("app.services.email_service.send_email") as mock_send:
mock_send.return_value = True
response = client.post("/register", data={
"username": "newuser",
"email": "new@example.com",
"password": "StrongPass123!",
"confirm_password": "StrongPass123!"
})
assert response.status_code == 200
# Verify send_email was called with the right arguments
mock_send.assert_called_once()
call_args = mock_send.call_args
assert call_args[1]["to"] == "new@example.com"
assert "Welcome" in call_args[1]["subject"]
def test_weather_endpoint(client):
"""Test weather endpoint without calling the real weather API."""
mock_response = MagicMock()
mock_response.status_code = 200
mock_response.json.return_value = {
"temperature": 72,
"condition": "sunny",
"city": "San Francisco"
}
with patch("app.services.weather_service.requests.get") as mock_get:
mock_get.return_value = mock_response
response = client.get("/api/weather?city=San+Francisco")
assert response.status_code == 200
data = response.get_json()
assert data["temperature"] == 72
assert data["condition"] == "sunny"
def test_weather_api_failure(client):
"""Test graceful handling when the weather API is down."""
with patch("app.services.weather_service.requests.get") as mock_get:
mock_get.side_effect = ConnectionError("API unavailable")
response = client.get("/api/weather?city=San+Francisco")
assert response.status_code == 503
data = response.get_json()
assert "unavailable" in data["message"].lower()
def test_user_service_with_mocked_db(app):
"""Test a service function without touching the database."""
from app.services.user_service import get_user_stats
mock_user = MagicMock()
mock_user.id = 1
mock_user.username = "alice"
mock_user.posts = [MagicMock(), MagicMock(), MagicMock()]
mock_user.created_at = "2024-01-01"
with app.app_context():
with patch("app.services.user_service.User") as MockUser:
MockUser.query.get.return_value = mock_user
stats = get_user_stats(1)
assert stats["username"] == "alice"
assert stats["post_count"] == 3
MockUser.query.get.assert_called_once_with(1)
from datetime import datetime
def test_time_dependent_feature(client, db, sample_user):
"""Test a feature that depends on the current time."""
# Mock datetime to control 'now'
frozen_time = datetime(2024, 12, 25, 10, 0, 0)
with patch("app.routes.api.datetime") as mock_dt:
mock_dt.utcnow.return_value = frozen_time
mock_dt.side_effect = lambda *args, **kwargs: datetime(*args, **kwargs)
response = client.get("/api/greeting")
data = response.get_json()
assert "Merry Christmas" in data["message"]
# pip install pytest-mock
def test_with_mocker(client, mocker):
"""pytest-mock provides a mocker fixture with cleaner syntax."""
# Instead of: with patch("app.services.email_service.send_email") as mock_send:
mock_send = mocker.patch("app.services.email_service.send_email")
mock_send.return_value = True
response = client.post("/register", data={
"username": "newuser",
"email": "new@example.com",
"password": "StrongPass123!",
"confirm_password": "StrongPass123!"
})
mock_send.assert_called_once()
Template tests verify that your views render the correct HTML content. You are not testing Jinja2 itself — you are testing that your templates receive the right context and produce the expected output.
def test_dashboard_shows_user_posts(auth_client, db, sample_user):
"""Test that the dashboard displays the user's posts."""
# Create some posts for the user
from app.models import Post
for i in range(3):
post = Post(
title=f"Post {i}",
content=f"Content {i}",
author_id=sample_user.id
)
db.session.add(post)
db.session.commit()
response = auth_client.get("/dashboard")
html = response.get_data(as_text=True)
assert "Post 0" in html
assert "Post 1" in html
assert "Post 2" in html
def test_login_page_has_form_fields(client):
"""Test that the login page contains required form elements."""
response = client.get("/login")
html = response.get_data(as_text=True)
assert 'name="username"' in html or 'name="email"' in html
assert 'name="password"' in html
assert 'type="submit"' in html
def test_error_messages_display(client, db, sample_user):
"""Test that validation errors appear in the template."""
response = client.post("/login", data={
"username": "testuser",
"password": "wrong"
})
html = response.get_data(as_text=True)
assert "Invalid" in html or "incorrect" in html.lower()
def test_template_context(app, sample_user):
"""Test that the correct context is passed to templates."""
with app.test_request_context():
with app.test_client() as client:
# Login first
client.post("/login", data={
"username": "testuser",
"password": "password123"
})
# Flask records template rendering with this signal
from flask import template_rendered
recorded_templates = []
def record(sender, template, context, **extra):
recorded_templates.append((template, context))
template_rendered.connect(record, app)
try:
client.get("/dashboard")
assert len(recorded_templates) > 0
template, context = recorded_templates[0]
assert template.name == "dashboard.html"
assert "user" in context
assert context["user"].username == "testuser"
finally:
template_rendered.disconnect(record, app)
def test_navbar_shows_login_for_anonymous(client):
"""Test that anonymous users see the login link."""
response = client.get("/")
html = response.get_data(as_text=True)
assert "Login" in html
assert "Logout" not in html
def test_navbar_shows_logout_for_authenticated(auth_client):
"""Test that logged-in users see the logout link."""
response = auth_client.get("/")
html = response.get_data(as_text=True)
assert "Logout" in html
assert "Login" not in html
Every production application needs proper error handling, and every error handler needs tests. Flask allows you to register custom error handlers for HTTP status codes and exceptions.
def test_404_returns_json_for_api(client):
"""Test that API 404s return JSON, not HTML."""
response = client.get(
"/api/nonexistent",
headers={"Accept": "application/json"}
)
assert response.status_code == 404
data = response.get_json()
assert data["error"] == "Not Found"
assert "message" in data
def test_404_returns_html_for_browser(client):
"""Test that browser 404s return a friendly HTML page."""
response = client.get("/nonexistent-page")
assert response.status_code == 404
html = response.get_data(as_text=True)
assert "Page Not Found" in html or "404" in html
def test_500_error_handler(app):
"""Test that 500 errors return a proper error response."""
@app.route("/force-error")
def force_error():
raise RuntimeError("Something went wrong")
# Turn off TESTING to enable error handlers
app.config["TESTING"] = False
with app.test_client() as client:
response = client.get("/force-error")
assert response.status_code == 500
# Restore TESTING
app.config["TESTING"] = True
def test_rate_limit_error(client):
"""Test that rate limiting returns 429."""
# Make requests until rate limit is hit
for _ in range(100):
response = client.get("/api/data")
assert response.status_code == 429
data = response.get_json()
assert "rate limit" in data["message"].lower()
def test_method_not_allowed(client):
"""Test that wrong HTTP methods return 405."""
response = client.delete("/login") # Login does not support DELETE
assert response.status_code == 405
def test_validation_error_format(client, auth_headers):
"""Test that validation errors have a consistent format."""
response = client.post(
"/api/users",
json={"email": "not-a-valid-email"},
headers=auth_headers
)
assert response.status_code == 400
data = response.get_json()
assert "errors" in data
assert isinstance(data["errors"], dict)
assert "email" in data["errors"]
Test coverage measures which lines of your application code are executed during testing. It does not guarantee your tests are good, but low coverage guarantees you have blind spots. Use it as a guide, not a goal.
pip install pytest-cov
# Basic coverage report pytest --cov=app tests/ # Coverage with line numbers for missed lines pytest --cov=app --cov-report=term-missing tests/ # Generate HTML report (opens in browser) pytest --cov=app --cov-report=html tests/ # Open htmlcov/index.html in your browser # Generate XML report (for CI/CD tools) pytest --cov=app --cov-report=xml tests/ # Multiple report formats at once pytest --cov=app --cov-report=term-missing --cov-report=html tests/
Create a .coveragerc file to configure what gets measured:
[run]
source = app
omit =
app/migrations/*
app/__init__.py
*/test_*
*/conftest.py
[report]
exclude_lines =
pragma: no cover
def __repr__
if __name__ == .__main__
raise NotImplementedError
pass
fail_under = 80
show_missing = true
[html]
directory = htmlcov
$ pytest --cov=app --cov-report=term-missing tests/ ---------- coverage: platform linux, python 3.11.5 ---------- Name Stmts Miss Cover Missing ------------------------------------------------------------ app/__init__.py 25 0 100% app/models.py 48 3 94% 72-74 app/routes/auth.py 65 8 88% 45-48, 92-95 app/routes/api.py 89 12 87% 34, 67-72, 101-105 app/services/email_service.py 22 2 91% 18-19 app/services/auth_service.py 35 0 100% ------------------------------------------------------------ TOTAL 284 25 91%
The “Missing” column tells you exactly which lines are not covered. Lines 72-74 in models.py, for example, might be an edge case you have not tested. Investigate those lines and decide whether they need tests.
The fail_under = 80 setting in .coveragerc will cause pytest to fail if coverage drops below 80%. Add this to your CI pipeline to prevent coverage regressions.
Let us put everything together. Here is a complete, realistic test suite for a Flask application with user registration, authentication, and CRUD operations. This is the code you would actually write in a production project.
# app/models.py
from app import db
from werkzeug.security import generate_password_hash, check_password_hash
from datetime import datetime
class User(db.Model):
"""User model with authentication support."""
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
password_hash = db.Column(db.String(256), nullable=False)
is_admin = db.Column(db.Boolean, default=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
posts = db.relationship("Post", backref="author", lazy="dynamic")
def set_password(self, password):
self.password_hash = generate_password_hash(password)
def check_password(self, password):
return check_password_hash(self.password_hash, password)
def to_dict(self):
return {
"id": self.id,
"username": self.username,
"email": self.email,
"is_admin": self.is_admin,
"created_at": self.created_at.isoformat()
}
def __repr__(self):
return f"<User {self.username}>"
class Post(db.Model):
"""Blog post model."""
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(200), nullable=False)
content = db.Column(db.Text, nullable=False)
author_id = db.Column(db.Integer, db.ForeignKey("user.id"), nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
updated_at = db.Column(db.DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
def to_dict(self):
return {
"id": self.id,
"title": self.title,
"content": self.content,
"author": self.author.username,
"created_at": self.created_at.isoformat(),
"updated_at": self.updated_at.isoformat()
}
def __repr__(self):
return f"<Post {self.title}>"
# tests/conftest.py
import pytest
from app import create_app, db as _db
from app.models import User, Post
@pytest.fixture(scope="session")
def app():
"""Create the Flask application for testing."""
app = create_app("config.TestingConfig")
yield app
@pytest.fixture(scope="function")
def db(app):
"""Provide a clean database for each test."""
with app.app_context():
_db.create_all()
yield _db
_db.session.rollback()
_db.drop_all()
@pytest.fixture
def client(app):
"""Provide a Flask test client."""
with app.test_client() as client:
yield client
@pytest.fixture
def runner(app):
"""Provide a Flask CLI test runner."""
return app.test_cli_runner()
@pytest.fixture
def sample_user(db):
"""Create and return a standard test user."""
user = User(
username="testuser",
email="test@example.com"
)
user.set_password("password123")
db.session.add(user)
db.session.commit()
return user
@pytest.fixture
def admin_user(db):
"""Create and return an admin user."""
user = User(
username="admin",
email="admin@example.com",
is_admin=True
)
user.set_password("admin123")
db.session.add(user)
db.session.commit()
return user
@pytest.fixture
def auth_client(client, sample_user):
"""Provide a test client logged in as sample_user."""
client.post("/login", data={
"username": "testuser",
"password": "password123"
})
return client
@pytest.fixture
def admin_client(client, admin_user):
"""Provide a test client logged in as admin."""
client.post("/login", data={
"username": "admin",
"password": "admin123"
})
return client
@pytest.fixture
def api_token(client, sample_user):
"""Get a JWT token for the sample user."""
response = client.post("/api/auth/login", json={
"username": "testuser",
"password": "password123"
})
return response.get_json()["access_token"]
@pytest.fixture
def auth_headers(api_token):
"""Provide headers with JWT authentication."""
return {
"Authorization": f"Bearer {api_token}",
"Content-Type": "application/json"
}
@pytest.fixture
def sample_posts(db, sample_user):
"""Create sample posts for testing."""
posts = []
for i in range(5):
post = Post(
title=f"Test Post {i}",
content=f"Content for test post {i}. " * 10,
author_id=sample_user.id
)
db.session.add(post)
posts.append(post)
db.session.commit()
return posts
# tests/test_registration.py
import pytest
from app.models import User
class TestRegistration:
"""User registration tests."""
def test_register_success(self, client, db):
"""Test successful registration with valid data."""
response = client.post("/register", data={
"username": "alice",
"email": "alice@example.com",
"password": "SecurePass123!",
"confirm_password": "SecurePass123!"
}, follow_redirects=True)
assert response.status_code == 200
# Verify user exists in database
user = User.query.filter_by(username="alice").first()
assert user is not None
assert user.email == "alice@example.com"
assert user.check_password("SecurePass123!")
def test_register_duplicate_username(self, client, db, sample_user):
"""Test that duplicate usernames are rejected."""
response = client.post("/register", data={
"username": "testuser", # Already exists
"email": "different@example.com",
"password": "SecurePass123!",
"confirm_password": "SecurePass123!"
})
assert b"already" in response.data.lower() or response.status_code == 400
def test_register_duplicate_email(self, client, db, sample_user):
"""Test that duplicate emails are rejected."""
response = client.post("/register", data={
"username": "different_user",
"email": "test@example.com", # Already exists
"password": "SecurePass123!",
"confirm_password": "SecurePass123!"
})
assert b"already" in response.data.lower() or response.status_code == 400
def test_register_password_mismatch(self, client, db):
"""Test that mismatched passwords are rejected."""
response = client.post("/register", data={
"username": "alice",
"email": "alice@example.com",
"password": "SecurePass123!",
"confirm_password": "DifferentPass456!"
})
assert response.status_code == 400 or b"match" in response.data.lower()
def test_register_weak_password(self, client, db):
"""Test that weak passwords are rejected."""
response = client.post("/register", data={
"username": "alice",
"email": "alice@example.com",
"password": "123",
"confirm_password": "123"
})
assert response.status_code == 400 or b"password" in response.data.lower()
@pytest.mark.parametrize("field,value", [
("username", ""),
("email", ""),
("password", ""),
("email", "not-an-email"),
])
def test_register_invalid_input(self, client, db, field, value):
"""Test registration with various invalid inputs."""
data = {
"username": "alice",
"email": "alice@example.com",
"password": "SecurePass123!",
"confirm_password": "SecurePass123!"
}
data[field] = value
response = client.post("/register", data=data)
assert response.status_code == 400 or b"error" in response.data.lower()
# tests/test_auth.py
class TestAuthentication:
"""Login and logout tests."""
def test_login_success(self, client, sample_user):
"""Test successful login redirects to dashboard."""
response = client.post("/login", data={
"username": "testuser",
"password": "password123"
}, follow_redirects=True)
assert response.status_code == 200
assert b"testuser" in response.data
def test_login_wrong_password(self, client, sample_user):
"""Test login with wrong password fails."""
response = client.post("/login", data={
"username": "testuser",
"password": "wrongpassword"
})
assert response.status_code != 200 or b"Invalid" in response.data
def test_login_nonexistent_user(self, client, db):
"""Test login with nonexistent username fails."""
response = client.post("/login", data={
"username": "nobody",
"password": "password123"
})
assert response.status_code != 200 or b"Invalid" in response.data
def test_logout_clears_session(self, auth_client):
"""Test that logout clears the session."""
# Verify we are logged in
response = auth_client.get("/dashboard")
assert response.status_code == 200
# Logout
auth_client.get("/logout", follow_redirects=True)
# Verify we are logged out
response = auth_client.get("/dashboard")
assert response.status_code == 302 # Redirect to login
def test_session_persists_across_requests(self, auth_client):
"""Test that the session stays active across multiple requests."""
response1 = auth_client.get("/dashboard")
assert response1.status_code == 200
response2 = auth_client.get("/profile")
assert response2.status_code == 200
# tests/test_crud.py
from app.models import Post
class TestPostCRUD:
"""Test Create, Read, Update, Delete for posts."""
def test_create_post(self, auth_client, db, sample_user):
"""Test creating a new post."""
response = auth_client.post("/posts/new", data={
"title": "My New Post",
"content": "This is the post content."
}, follow_redirects=True)
assert response.status_code == 200
post = Post.query.filter_by(title="My New Post").first()
assert post is not None
assert post.author_id == sample_user.id
def test_read_post(self, client, db, sample_posts):
"""Test reading a single post."""
post = sample_posts[0]
response = client.get(f"/posts/{post.id}")
assert response.status_code == 200
assert post.title.encode() in response.data
def test_read_nonexistent_post(self, client, db):
"""Test reading a post that does not exist."""
response = client.get("/posts/99999")
assert response.status_code == 404
def test_update_post(self, auth_client, db, sample_posts):
"""Test updating a post."""
post = sample_posts[0]
response = auth_client.post(f"/posts/{post.id}/edit", data={
"title": "Updated Title",
"content": "Updated content."
}, follow_redirects=True)
assert response.status_code == 200
updated = Post.query.get(post.id)
assert updated.title == "Updated Title"
assert updated.content == "Updated content."
def test_cannot_update_others_post(self, auth_client, db, admin_user):
"""Test that users cannot edit posts they do not own."""
# Create a post owned by admin
post = Post(
title="Admin Post",
content="Admin content",
author_id=admin_user.id
)
db.session.add(post)
db.session.commit()
# Try to edit as regular user
response = auth_client.post(f"/posts/{post.id}/edit", data={
"title": "Hacked Title",
"content": "Hacked content."
})
assert response.status_code == 403
def test_delete_post(self, auth_client, db, sample_posts):
"""Test deleting a post."""
post = sample_posts[0]
post_id = post.id
response = auth_client.post(
f"/posts/{post_id}/delete",
follow_redirects=True
)
assert response.status_code == 200
assert Post.query.get(post_id) is None
def test_list_posts(self, client, db, sample_posts):
"""Test listing all posts."""
response = client.get("/posts")
assert response.status_code == 200
html = response.get_data(as_text=True)
for post in sample_posts:
assert post.title in html
# tests/test_api.py
import json
class TestPostAPI:
"""Test the REST API for posts."""
def test_list_posts(self, client, db, sample_posts):
"""Test GET /api/posts returns all posts."""
response = client.get("/api/posts")
assert response.status_code == 200
data = response.get_json()
assert len(data["items"]) == 5
def test_get_single_post(self, client, db, sample_posts):
"""Test GET /api/posts/:id returns a single post."""
post = sample_posts[0]
response = client.get(f"/api/posts/{post.id}")
assert response.status_code == 200
data = response.get_json()
assert data["title"] == post.title
assert data["author"] == "testuser"
def test_create_post_via_api(self, client, db, auth_headers):
"""Test POST /api/posts creates a new post."""
response = client.post("/api/posts",
json={
"title": "API Post",
"content": "Created via API."
},
headers=auth_headers
)
assert response.status_code == 201
data = response.get_json()
assert data["title"] == "API Post"
assert data["id"] is not None
def test_create_post_without_auth(self, client, db):
"""Test that creating a post requires authentication."""
response = client.post("/api/posts",
json={"title": "No Auth", "content": "Should fail."}
)
assert response.status_code == 401
def test_update_post_via_api(self, client, db, auth_headers, sample_posts):
"""Test PUT /api/posts/:id updates a post."""
post = sample_posts[0]
response = client.put(f"/api/posts/{post.id}",
json={"title": "Updated via API"},
headers=auth_headers
)
assert response.status_code == 200
data = response.get_json()
assert data["title"] == "Updated via API"
def test_delete_post_via_api(self, client, db, auth_headers, sample_posts):
"""Test DELETE /api/posts/:id removes a post."""
post = sample_posts[0]
response = client.delete(
f"/api/posts/{post.id}",
headers=auth_headers
)
assert response.status_code == 204
# Verify deletion
response = client.get(f"/api/posts/{post.id}")
assert response.status_code == 404
def test_api_pagination(self, client, db, sample_posts):
"""Test that the API paginates results."""
response = client.get("/api/posts?page=1&per_page=2")
data = response.get_json()
assert len(data["items"]) == 2
assert data["total"] == 5
assert data["pages"] == 3
assert data["has_next"] is True
def test_api_returns_json_content_type(self, client, db, sample_posts):
"""Test that API responses have correct content type."""
response = client.get("/api/posts")
assert response.content_type == "application/json"
# tests/test_errors.py
class TestErrorHandling:
"""Test error handling across the application."""
def test_404_page(self, client):
"""Test custom 404 page."""
response = client.get("/this-does-not-exist")
assert response.status_code == 404
def test_api_404_returns_json(self, client):
"""Test that API 404s return JSON."""
response = client.get("/api/posts/99999",
headers={"Accept": "application/json"})
assert response.status_code == 404
data = response.get_json()
assert "error" in data
def test_405_method_not_allowed(self, client):
"""Test that unsupported methods return 405."""
response = client.delete("/login")
assert response.status_code == 405
def test_400_bad_request(self, client, auth_headers):
"""Test that malformed requests return 400."""
response = client.post("/api/posts",
data="not json",
content_type="application/json",
headers=auth_headers
)
assert response.status_code == 400
Tests are only useful if they run automatically. Every push, every pull request should trigger your test suite. Here is how to set up testing in GitHub Actions.
# .github/workflows/tests.yml
name: Tests
on:
push:
branches: [main, develop]
pull_request:
branches: [main]
jobs:
test:
runs-on: ubuntu-latest
strategy:
matrix:
python-version: ["3.10", "3.11", "3.12"]
services:
postgres:
image: postgres:15
env:
POSTGRES_USER: test
POSTGRES_PASSWORD: test
POSTGRES_DB: test_db
ports:
- 5432:5432
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:
- uses: actions/checkout@v4
- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}
- name: Cache pip packages
uses: actions/cache@v4
with:
path: ~/.cache/pip
key: ${{ runner.os }}-pip-${{ hashFiles('requirements.txt') }}
restore-keys: |
${{ runner.os }}-pip-
- name: Install dependencies
run: |
python -m pip install --upgrade pip
pip install -r requirements.txt
pip install pytest pytest-cov
- name: Run tests with coverage
env:
DATABASE_URL: postgresql://test:test@localhost:5432/test_db
FLASK_ENV: testing
run: |
pytest --cov=app --cov-report=xml --cov-report=term-missing tests/
- name: Upload coverage to Codecov
if: matrix.python-version == '3.12'
uses: codecov/codecov-action@v4
with:
file: ./coverage.xml
fail_ci_if_error: true
# pyproject.toml
[tool.pytest.ini_options]
testpaths = ["tests"]
addopts = "-v --tb=short --strict-markers"
markers = [
"slow: marks tests as slow (deselect with '-m "not slow"')",
"integration: marks integration tests",
"e2e: marks end-to-end tests",
]
# Run fast tests in CI by default, all tests on main branch
# pytest -m "not slow" # Skip slow tests
# pytest -m "not e2e" # Skip e2e tests
# pytest # Run everything
import pytest
@pytest.mark.slow
def test_generate_large_report(client, db, seed_data):
"""This test takes 30+ seconds, mark it as slow."""
response = client.get("/api/reports/annual")
assert response.status_code == 200
@pytest.mark.integration
def test_full_registration_flow(client, db):
"""Integration test: register, verify email, login."""
# Register
client.post("/register", data={...})
# Verify email (mock the email, extract token)
# Login
# Check dashboard
@pytest.mark.e2e
def test_complete_user_journey(client, db):
"""End-to-end: register, login, create post, comment, logout."""
pass
# Run only fast tests pytest -m "not slow" # Run only integration tests pytest -m integration # Run everything except e2e pytest -m "not e2e" # Run specific test file pytest tests/test_auth.py # Run specific test class pytest tests/test_auth.py::TestLogin # Run specific test function pytest tests/test_auth.py::TestLogin::test_login_success # Run tests matching a keyword pytest -k "login or register" # Run tests in parallel (requires pytest-xdist) pytest -n auto
These are the mistakes I see most often in Flask test suites. Each one has burned me or someone on my team at some point.
The deadliest testing sin. When tests share state, they pass individually but fail when run together, or worse, they pass together but in a specific order.
# BAD: Tests depend on shared state
user_count = 0
def test_create_user(client, db):
global user_count
client.post("/register", data={...})
user_count += 1
def test_user_count(client, db):
# This only passes if test_create_user ran first!
response = client.get("/api/users/count")
assert response.get_json()["count"] == user_count
# GOOD: Each test creates its own state
def test_create_user(client, db):
response = client.post("/register", data={
"username": "alice",
"email": "alice@example.com",
"password": "SecurePass123!",
"confirm_password": "SecurePass123!"
})
assert response.status_code == 200
def test_user_count_with_seeded_data(client, db):
# Create known state within the test
from app.models import User
for i in range(3):
user = User(username=f"user_{i}", email=f"user_{i}@test.com")
user.set_password("pass")
db.session.add(user)
db.session.commit()
response = client.get("/api/users/count")
assert response.get_json()["count"] == 3
# BAD: Testing how the code works internally
def test_login_calls_check_password(client, sample_user, mocker):
mock_check = mocker.patch.object(User, "check_password", return_value=True)
client.post("/login", data={"username": "testuser", "password": "pass"})
mock_check.assert_called_once_with("pass")
# This test breaks if you refactor the login code, even if login still works
# GOOD: Testing observable behavior
def test_login_with_correct_password_succeeds(client, sample_user):
response = client.post("/login", data={
"username": "testuser",
"password": "password123"
}, follow_redirects=True)
assert response.status_code == 200
assert b"Dashboard" in response.data
# BAD: Creating the app for every test
def test_something():
app = create_app("config.TestingConfig") # Expensive!
with app.test_client() as client:
response = client.get("/")
assert response.status_code == 200
# GOOD: Use session-scoped app fixture
@pytest.fixture(scope="session")
def app():
return create_app("config.TestingConfig")
def test_something(client): # client fixture uses the session app
response = client.get("/")
assert response.status_code == 200
# BAD: Only testing the happy path
def test_create_user(client, db):
response = client.post("/register", data={
"username": "alice",
"email": "alice@example.com",
"password": "Pass123!"
})
assert response.status_code == 200
# GOOD: Test boundaries and edge cases
@pytest.mark.parametrize("username", [
"", # Empty
"a", # Too short
"a" * 256, # Too long
"user name", # Contains space
"user@name", # Contains special char
"admin", # Reserved word
"<script>alert(1)</script>", # XSS attempt
])
def test_create_user_invalid_username(client, db, username):
response = client.post("/register", data={
"username": username,
"email": "test@example.com",
"password": "SecurePass123!",
"confirm_password": "SecurePass123!"
})
assert response.status_code == 400
# BAD: Only checking status code
def test_create_user(client, db):
response = client.post("/api/users", json={...})
assert response.status_code == 201 # What if the response body is wrong?
# GOOD: Verify the full response
def test_create_user(client, db, auth_headers):
response = client.post("/api/users",
json={"username": "alice", "email": "alice@example.com"},
headers=auth_headers
)
assert response.status_code == 201
data = response.get_json()
assert data["username"] == "alice"
assert data["email"] == "alice@example.com"
assert "id" in data
assert "created_at" in data
assert "password" not in data # Sensitive fields should not leak
Every test should have three distinct phases: Arrange, Act, Assert. This makes tests readable and consistent.
def test_update_post_title(auth_client, db, sample_posts):
"""Test updating a post's title."""
# Arrange: Get a post to update
post = sample_posts[0]
original_content = post.content
# Act: Send the update request
response = auth_client.post(f"/posts/{post.id}/edit", data={
"title": "Brand New Title",
"content": original_content
}, follow_redirects=True)
# Assert: Verify the result
assert response.status_code == 200
updated = Post.query.get(post.id)
assert updated.title == "Brand New Title"
assert updated.content == original_content # Content unchanged
# BAD: Vague names
def test_user():
pass
def test_login_1():
pass
def test_post():
pass
# GOOD: Names describe the scenario and expected outcome
def test_register_with_valid_data_creates_user():
pass
def test_login_with_wrong_password_returns_401():
pass
def test_delete_post_by_non_owner_returns_403():
pass
def test_api_returns_paginated_results_with_metadata():
pass
# Each test must work regardless of execution order # Use fixtures for setup, not other tests # Never rely on database state from a previous test # Never rely on global variables or module-level state
@pytest.fixture
def user_with_published_posts(db, sample_user):
"""Build on existing fixtures for specific scenarios."""
posts = []
for i in range(3):
post = Post(
title=f"Published {i}",
content=f"Content {i}",
author_id=sample_user.id,
is_published=True
)
db.session.add(post)
posts.append(post)
db.session.commit()
return sample_user, posts
def test_user_public_profile(client, user_with_published_posts):
"""Fixture provides exactly the state this test needs."""
user, posts = user_with_published_posts
response = client.get(f"/users/{user.username}")
assert response.status_code == 200
html = response.get_data(as_text=True)
for post in posts:
assert post.title in html
@pytest.mark.parametrize("endpoint,expected_status", [
("/", 200),
("/login", 200),
("/register", 200),
("/about", 200),
("/nonexistent", 404),
])
def test_public_endpoints(client, endpoint, expected_status):
"""Test that public endpoints return expected status codes."""
response = client.get(endpoint)
assert response.status_code == expected_status
@pytest.mark.parametrize("method,endpoint", [
("GET", "/dashboard"),
("GET", "/profile"),
("POST", "/posts/new"),
("GET", "/settings"),
])
def test_protected_endpoints_require_login(client, method, endpoint):
"""Test that protected endpoints redirect unauthenticated users."""
response = getattr(client, method.lower())(endpoint)
assert response.status_code == 302
assert "/login" in response.location
# Run tests in watch mode (requires pytest-watch) pip install pytest-watch ptw tests/ # Run only tests that failed last time pytest --lf # Run tests that failed first, then the rest pytest --ff # Stop on first failure pytest -x # Stop after 3 failures pytest --maxfail=3 # Run only tests modified since last commit pytest --co -q | head # List what would run
conftest.py eliminate duplication and make tests readable. Compose small fixtures into larger ones. Use the right scope for each fixture.test_client() lets you make HTTP requests without a running server. It is fast, reliable, and supports all HTTP methods, headers, cookies, and redirects.Testing is not extra work — it is the work. Code without tests is a liability. Code with good tests is an asset you can refactor, extend, and deploy with confidence. Start with the patterns in this tutorial, adapt them to your project, and make testing a non-negotiable part of your development workflow.
Every web application that handles user data needs authentication and authorization. Authentication answers the question “Who are you?” while authorization answers “What are you allowed to do?” Getting these wrong can expose user data, enable account takeovers, and destroy user trust. Getting them right is non-negotiable.
In this tutorial, we will build authentication and authorization into Flask applications from the ground up. We will cover password hashing, session-based auth, token-based auth with JWT, role-based access control, OAuth2 social login, and a complete practical example that ties everything together.
| Concept | Question | Example |
|---|---|---|
| Authentication | Who are you? | Login with username and password |
| Authorization | What can you do? | Only admins can delete users |
Authentication always comes first. You cannot determine what someone is allowed to do until you know who they are. Flask gives you the building blocks; libraries like Flask-Login and Flask-JWT-Extended give you battle-tested implementations.
| Approach | Best For | Mechanism |
|---|---|---|
| Session-based | Server-rendered web apps | Cookie with session ID, server stores session data |
| Token-based (JWT) | APIs, SPAs, mobile apps | Signed token sent in Authorization header |
| OAuth2 / Social Login | Third-party identity providers | Delegated auth via Google, GitHub, etc. |
| API Keys | Service-to-service communication | Static key in header or query param |
Never store plaintext passwords. If your database is compromised, every user account is instantly exposed. Password hashing is a one-way transformation: you can verify a password against a hash, but you cannot reverse the hash to recover the original password.
Flask ships with Werkzeug, which provides generate_password_hash and check_password_hash. These use PBKDF2 by default, which is a solid choice for most applications.
from werkzeug.security import generate_password_hash, check_password_hash
# Hash a password (uses pbkdf2:sha256 by default)
password = "my_secure_password_123"
hashed = generate_password_hash(password)
print(hashed)
# pbkdf2:sha256:600000$salt$hash...
# Verify a password against the hash
print(check_password_hash(hashed, "my_secure_password_123")) # True
print(check_password_hash(hashed, "wrong_password")) # False
# Customize the method and salt length
hashed_custom = generate_password_hash(
password,
method="pbkdf2:sha256:260000",
salt_length=16
)
Key points about Werkzeug hashing:
generate_password_hash produces a different result because of random saltcheck_password_hash extracts the salt from the stored hash and recomputesBcrypt is another popular hashing algorithm specifically designed for passwords. It has a built-in work factor that makes it progressively harder to crack as hardware improves.
pip install flask-bcrypt
from flask import Flask
from flask_bcrypt import Bcrypt
app = Flask(__name__)
bcrypt = Bcrypt(app)
# Hash a password
password = "my_secure_password_123"
hashed = bcrypt.generate_password_hash(password).decode("utf-8")
print(hashed)
# $2b$12$randomsalt...
# Verify
print(bcrypt.check_password_hash(hashed, "my_secure_password_123")) # True
print(bcrypt.check_password_hash(hashed, "wrong_password")) # False
| Feature | Werkzeug (PBKDF2) | Bcrypt |
|---|---|---|
| Built into Flask | Yes | No (requires flask-bcrypt) |
| Algorithm | PBKDF2-SHA256 | Blowfish-based |
| Max password length | No limit | 72 bytes |
| Industry adoption | High | Very high |
| Recommendation | Good default | Good if team prefers bcrypt |
Both are excellent choices. Use Werkzeug’s built-in hashing unless your team has a specific reason to prefer bcrypt.
Session-based authentication is the traditional approach for server-rendered web applications. The server creates a session after login, stores session data server-side, and sends a session ID cookie to the client. On every subsequent request, the browser automatically sends the cookie, and the server looks up the session.
Flask uses client-side sessions by default. The session data is serialized, cryptographically signed with your secret_key, and stored in a cookie. The server does not need to store anything. The signature prevents tampering, but the data is not encrypted — users can read (but not modify) session contents.
from flask import Flask, session
app = Flask(__name__)
# CRITICAL: Set a strong secret key
# In production, load from environment variable
app.secret_key = "your-secret-key-change-this-in-production"
# Better: load from environment
import os
app.secret_key = os.environ.get("FLASK_SECRET_KEY", "dev-fallback-key")
Important: If someone obtains your secret_key, they can forge session cookies and impersonate any user. Never hardcode it in source code for production. Use environment variables or a secrets manager.
Let us build a minimal session-based auth system without any extensions.
from flask import Flask, session, request, redirect, url_for, render_template_string
from werkzeug.security import generate_password_hash, check_password_hash
from functools import wraps
import os
app = Flask(__name__)
app.secret_key = os.environ.get("FLASK_SECRET_KEY", "dev-secret-key")
# Simulated user database
users_db = {
"alice": {
"password_hash": generate_password_hash("alice123"),
"email": "alice@example.com",
"role": "admin"
},
"bob": {
"password_hash": generate_password_hash("bob456"),
"email": "bob@example.com",
"role": "user"
}
}
def login_required(f):
"""Custom decorator to protect routes."""
@wraps(f)
def decorated_function(*args, **kwargs):
if "username" not in session:
return redirect(url_for("login"))
return f(*args, **kwargs)
return decorated_function
@app.route("/login", methods=["GET", "POST"])
def login():
if request.method == "POST":
username = request.form.get("username", "").strip()
password = request.form.get("password", "")
user = users_db.get(username)
if user and check_password_hash(user["password_hash"], password):
# Create session
session["username"] = username
session["role"] = user["role"]
return redirect(url_for("dashboard"))
else:
return render_template_string(LOGIN_TEMPLATE, error="Invalid credentials")
return render_template_string(LOGIN_TEMPLATE, error=None)
@app.route("/logout")
def logout():
session.clear()
return redirect(url_for("login"))
@app.route("/dashboard")
@login_required
def dashboard():
return f"<h1>Welcome, {session['username']}!</h1><p>Role: {session['role']}</p><a href='/logout'>Logout</a>"
LOGIN_TEMPLATE = """
<h1>Login</h1>
{% if error %}<p style="color:red">{{ error }}</p>{% endif %}
<form method="post">
<input name="username" placeholder="Username" required><br>
<input name="password" type="password" placeholder="Password" required><br>
<button type="submit">Login</button>
</form>
"""
if __name__ == "__main__":
app.run(debug=True)
Flask provides several configuration options to control session behavior.
from datetime import timedelta
app.config.update(
# Session lifetime (default: browser session, until tab closes)
PERMANENT_SESSION_LIFETIME=timedelta(hours=1),
# Cookie settings
SESSION_COOKIE_SECURE=True, # Only send over HTTPS
SESSION_COOKIE_HTTPONLY=True, # JavaScript cannot access the cookie
SESSION_COOKIE_SAMESITE="Lax", # CSRF protection
SESSION_COOKIE_NAME="my_session", # Custom cookie name
)
# Make sessions permanent (respect PERMANENT_SESSION_LIFETIME)
@app.before_request
def make_session_permanent():
session.permanent = True
| Setting | Default | Production Recommendation |
|---|---|---|
| SESSION_COOKIE_SECURE | False | True (requires HTTPS) |
| SESSION_COOKIE_HTTPONLY | True | True |
| SESSION_COOKIE_SAMESITE | “Lax” | “Lax” or “Strict” |
| PERMANENT_SESSION_LIFETIME | 31 days | 1 hour to 1 day depending on risk |
For applications that store sensitive data in sessions or need to invalidate sessions server-side, use Flask-Session to store session data in Redis, a database, or the filesystem.
pip install flask-session redis
from flask import Flask, session
from flask_session import Session
import redis
app = Flask(__name__)
app.config.update(
SESSION_TYPE="redis",
SESSION_REDIS=redis.from_url("redis://localhost:6379"),
SESSION_PERMANENT=True,
PERMANENT_SESSION_LIFETIME=3600, # 1 hour
)
Session(app)
# Now session data is stored in Redis, not in the cookie
# The cookie only contains the session ID
Flask-Login is the most popular extension for managing user sessions in Flask. It handles the boilerplate of login, logout, session management, and route protection so you can focus on your application logic.
pip install flask-login
from flask import Flask from flask_login import LoginManager app = Flask(__name__) app.secret_key = "your-secret-key" # Initialize Flask-Login login_manager = LoginManager() login_manager.init_app(app) # Where to redirect unauthenticated users login_manager.login_view = "auth.login" # Flash message category for unauthorized access login_manager.login_message_category = "warning"
Flask-Login requires a User model that implements specific properties and methods. The UserMixin class provides sensible defaults for all of them.
from flask_login import UserMixin
class User(UserMixin):
"""User model for Flask-Login.
UserMixin provides:
- is_authenticated: True (user has valid credentials)
- is_active: True (account is not suspended)
- is_anonymous: False (this is a real user)
- get_id(): returns self.id as a string
"""
def __init__(self, id, username, email, password_hash, role="user"):
self.id = id
self.username = username
self.email = email
self.password_hash = password_hash
self.role = role
def check_password(self, password):
from werkzeug.security import check_password_hash
return check_password_hash(self.password_hash, password)
def __repr__(self):
return f"<User {self.username}>"
# User loader callback: Flask-Login calls this on every request
# to load the user from the session cookie
@login_manager.user_loader
def load_user(user_id):
"""Load user by ID from your database.
This function is called on every request to deserialize the user
from the session. Return None if the user no longer exists.
"""
# With SQLAlchemy:
return User.query.get(int(user_id))
# With a dictionary (for demonstration):
# return users_db.get(int(user_id))
from flask import Blueprint, request, redirect, url_for, flash, render_template
from flask_login import login_user, logout_user, current_user, login_required
auth_bp = Blueprint("auth", __name__)
@auth_bp.route("/login", methods=["GET", "POST"])
def login():
# If already logged in, redirect to dashboard
if current_user.is_authenticated:
return redirect(url_for("main.dashboard"))
if request.method == "POST":
username = request.form.get("username", "").strip()
password = request.form.get("password", "")
remember = request.form.get("remember", False)
user = User.query.filter_by(username=username).first()
if user and user.check_password(password):
# login_user creates the session
login_user(user, remember=bool(remember))
# Redirect to the page they originally wanted
next_page = request.args.get("next")
return redirect(next_page or url_for("main.dashboard"))
flash("Invalid username or password.", "danger")
return render_template("login.html")
@auth_bp.route("/logout")
@login_required
def logout():
logout_user()
flash("You have been logged out.", "info")
return redirect(url_for("auth.login"))
@auth_bp.route("/profile")
@login_required
def profile():
# current_user is automatically available: it is the logged-in User object
return render_template("profile.html", user=current_user)
@login_required protects routes so that only authenticated users can access them. Unauthenticated users are redirected to the login_view you configured on the LoginManager.
from flask_login import login_required
@app.route("/settings")
@login_required
def settings():
"""Only authenticated users can access this page."""
return render_template("settings.html")
@app.route("/api/data")
@login_required
def api_data():
"""Protected API endpoint."""
return {"data": "sensitive information", "user": current_user.username}
When remember=True is passed to login_user(), Flask-Login sets a long-lived “remember me” cookie. Even if the session cookie expires (browser closes), the remember cookie will restore the session.
from datetime import timedelta # Configure remember me duration app.config["REMEMBER_COOKIE_DURATION"] = timedelta(days=14) app.config["REMEMBER_COOKIE_SECURE"] = True # HTTPS only app.config["REMEMBER_COOKIE_HTTPONLY"] = True # No JS access app.config["REMEMBER_COOKIE_SAMESITE"] = "Lax" # In login route: login_user(user, remember=True) # Sets the remember cookie
By default, Flask-Login redirects unauthenticated users to the login page. You can customize this behavior for API routes or special cases.
@login_manager.unauthorized_handler
def unauthorized():
"""Handle unauthorized access attempts."""
if request.is_json or request.path.startswith("/api/"):
# API requests get a JSON response
return {"error": "Authentication required"}, 401
# Browser requests get redirected to login
flash("Please log in to access this page.", "warning")
return redirect(url_for("auth.login", next=request.url))
Token-based authentication is the standard for REST APIs, single-page applications, and mobile apps. Instead of cookies and sessions, the client receives a signed token after login and sends it with every request in the Authorization header.
| Scenario | Use Sessions | Use Tokens (JWT) |
|---|---|---|
| Server-rendered web app (Jinja2 templates) | Yes | No |
| REST API consumed by frontend (React, Vue) | No | Yes |
| Mobile app backend | No | Yes |
| Microservices architecture | No | Yes |
| Third-party API access | No | Yes |
pip install flask-jwt-extended
from flask import Flask from flask_jwt_extended import JWTManager from datetime import timedelta app = Flask(__name__) # JWT Configuration app.config["JWT_SECRET_KEY"] = "your-jwt-secret-key" # Use env var in production app.config["JWT_ACCESS_TOKEN_EXPIRES"] = timedelta(hours=1) app.config["JWT_REFRESH_TOKEN_EXPIRES"] = timedelta(days=30) app.config["JWT_TOKEN_LOCATION"] = ["headers"] # Can also use cookies, query_string app.config["JWT_HEADER_NAME"] = "Authorization" app.config["JWT_HEADER_TYPE"] = "Bearer" jwt = JWTManager(app)
Access tokens are short-lived and used for API access. Refresh tokens are long-lived and used only to obtain new access tokens without re-entering credentials.
from flask import request, jsonify
from flask_jwt_extended import (
create_access_token,
create_refresh_token,
jwt_required,
get_jwt_identity,
get_jwt,
)
from werkzeug.security import check_password_hash
@app.route("/api/auth/login", methods=["POST"])
def api_login():
"""Authenticate user and return JWT tokens."""
data = request.get_json()
if not data or not data.get("username") or not data.get("password"):
return jsonify({"error": "Username and password required"}), 400
user = User.query.filter_by(username=data["username"]).first()
if not user or not user.check_password(data["password"]):
return jsonify({"error": "Invalid credentials"}), 401
# Create tokens with user identity and additional claims
access_token = create_access_token(
identity=str(user.id),
additional_claims={
"username": user.username,
"role": user.role,
"email": user.email
}
)
refresh_token = create_refresh_token(identity=str(user.id))
return jsonify({
"access_token": access_token,
"refresh_token": refresh_token,
"user": {
"id": user.id,
"username": user.username,
"role": user.role
}
}), 200
@app.route("/api/profile", methods=["GET"])
@jwt_required()
def api_profile():
"""Protected endpoint: requires valid access token."""
current_user_id = get_jwt_identity() # Returns the identity from the token
claims = get_jwt() # Returns all claims in the token
user = User.query.get(int(current_user_id))
if not user:
return jsonify({"error": "User not found"}), 404
return jsonify({
"id": user.id,
"username": user.username,
"email": user.email,
"role": claims.get("role")
})
@app.route("/api/admin/users", methods=["GET"])
@jwt_required()
def admin_list_users():
"""Admin-only endpoint."""
claims = get_jwt()
if claims.get("role") != "admin":
return jsonify({"error": "Admin access required"}), 403
users = User.query.all()
return jsonify([
{"id": u.id, "username": u.username, "role": u.role}
for u in users
])
The client sends the token like this:
# Login and get tokens
curl -X POST http://localhost:5000/api/auth/login \
-H "Content-Type: application/json" \
-d '{"username": "alice", "password": "alice123"}'
# Use access token to call protected endpoint
curl http://localhost:5000/api/profile \
-H "Authorization: Bearer eyJhbGciOiJIUzI1NiIs..."
# Refresh the access token
curl -X POST http://localhost:5000/api/auth/refresh \
-H "Authorization: Bearer eyJhbGciOiJIUzI1NiIs..."
@app.route("/api/auth/refresh", methods=["POST"])
@jwt_required(refresh=True)
def refresh():
"""Use refresh token to get a new access token."""
current_user_id = get_jwt_identity()
user = User.query.get(int(current_user_id))
if not user:
return jsonify({"error": "User not found"}), 404
new_access_token = create_access_token(
identity=str(user.id),
additional_claims={
"username": user.username,
"role": user.role,
"email": user.email
}
)
return jsonify({"access_token": new_access_token}), 200
The typical flow is:
JWTs are stateless: once issued, they are valid until they expire. To revoke tokens (for logout, password change, or security incidents), you need a blocklist.
from flask_jwt_extended import get_jwt
# In-memory blocklist (use Redis in production)
BLOCKLIST = set()
@jwt.token_in_blocklist_loader
def check_if_token_revoked(jwt_header, jwt_payload):
"""Check if a token has been revoked.
This callback is called on every request to a protected endpoint.
Return True if the token is revoked (blocked).
"""
jti = jwt_payload["jti"] # JWT ID: unique identifier for the token
return jti in BLOCKLIST
@app.route("/api/auth/logout", methods=["POST"])
@jwt_required()
def api_logout():
"""Revoke the current access token."""
jti = get_jwt()["jti"]
BLOCKLIST.add(jti)
return jsonify({"message": "Token revoked successfully"}), 200
@app.route("/api/auth/logout-all", methods=["POST"])
@jwt_required()
def api_logout_all():
"""Revoke both access and refresh tokens."""
jti = get_jwt()["jti"]
BLOCKLIST.add(jti)
# In practice, you would also revoke the refresh token
# by storing revoked tokens in Redis with TTL matching token expiry
return jsonify({"message": "All tokens revoked"}), 200
Production blocklist with Redis:
import redis
redis_client = redis.from_url("redis://localhost:6379")
@jwt.token_in_blocklist_loader
def check_if_token_revoked(jwt_header, jwt_payload):
jti = jwt_payload["jti"]
token_in_redis = redis_client.get(f"blocklist:{jti}")
return token_in_redis is not None
def revoke_token(jti, expires_in):
"""Add token to blocklist with TTL matching token expiry."""
redis_client.setex(f"blocklist:{jti}", expires_in, "revoked")
RBAC restricts access based on user roles. Instead of checking individual permissions, you assign roles (admin, editor, user) and define what each role can do.
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
# Many-to-many relationship between users and roles
user_roles = db.Table("user_roles",
db.Column("user_id", db.Integer, db.ForeignKey("users.id"), primary_key=True),
db.Column("role_id", db.Integer, db.ForeignKey("roles.id"), primary_key=True)
)
class Role(db.Model):
__tablename__ = "roles"
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(50), unique=True, nullable=False)
description = db.Column(db.String(200))
def __repr__(self):
return f"<Role {self.name}>"
class User(UserMixin, db.Model):
__tablename__ = "users"
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
password_hash = db.Column(db.String(256), nullable=False)
is_active_user = db.Column(db.Boolean, default=True)
# Many-to-many relationship
roles = db.relationship("Role", secondary=user_roles,
backref=db.backref("users", lazy="dynamic"))
def has_role(self, role_name):
"""Check if user has a specific role."""
return any(role.name == role_name for role in self.roles)
def has_any_role(self, *role_names):
"""Check if user has any of the specified roles."""
return any(self.has_role(name) for name in role_names)
@property
def is_admin(self):
return self.has_role("admin")
def set_password(self, password):
self.password_hash = generate_password_hash(password)
def check_password(self, password):
return check_password_hash(self.password_hash, password)
from functools import wraps
from flask import abort
from flask_login import current_user, login_required
def role_required(*roles):
"""Decorator that requires the user to have one of the specified roles.
Usage:
@role_required("admin")
@role_required("admin", "editor")
"""
def decorator(f):
@wraps(f)
@login_required # Ensure user is authenticated first
def decorated_function(*args, **kwargs):
if not current_user.has_any_role(*roles):
abort(403) # Forbidden
return f(*args, **kwargs)
return decorated_function
return decorator
def admin_required(f):
"""Shortcut decorator for admin-only routes."""
@wraps(f)
@login_required
def decorated_function(*args, **kwargs):
if not current_user.is_admin:
abort(403)
return f(*args, **kwargs)
return decorated_function
# Usage
@app.route("/admin/dashboard")
@admin_required
def admin_dashboard():
return render_template("admin/dashboard.html")
@app.route("/editor/posts")
@role_required("admin", "editor")
def manage_posts():
return render_template("editor/posts.html")
@app.route("/api/admin/users", methods=["DELETE"])
@role_required("admin")
def delete_user():
# Only admins can delete users
user_id = request.json.get("user_id")
user = User.query.get_or_404(user_id)
db.session.delete(user)
db.session.commit()
return jsonify({"message": "User deleted"}), 200
from flask_jwt_extended import jwt_required, get_jwt
def jwt_role_required(*roles):
"""Decorator for JWT-protected endpoints with role checking."""
def decorator(f):
@wraps(f)
@jwt_required()
def decorated_function(*args, **kwargs):
claims = get_jwt()
user_role = claims.get("role", "")
if user_role not in roles:
return jsonify({"error": "Insufficient permissions"}), 403
return f(*args, **kwargs)
return decorated_function
return decorator
@app.route("/api/admin/settings", methods=["PUT"])
@jwt_role_required("admin")
def update_settings():
"""Only admins can update application settings."""
data = request.get_json()
# ... update settings ...
return jsonify({"message": "Settings updated"}), 200
In Jinja2 templates, you can show or hide elements based on the user’s role.
<!-- Navigation showing role-specific links -->
<nav>
<a href="{{ url_for('main.home') }}">Home</a>
{% if current_user.is_authenticated %}
<a href="{{ url_for('main.dashboard') }}">Dashboard</a>
{% if current_user.is_admin %}
<a href="{{ url_for('admin.dashboard') }}">Admin Panel</a>
<a href="{{ url_for('admin.users') }}">Manage Users</a>
{% endif %}
{% if current_user.has_any_role('admin', 'editor') %}
<a href="{{ url_for('editor.posts') }}">Manage Posts</a>
{% endif %}
<a href="{{ url_for('auth.logout') }}">Logout ({{ current_user.username }})</a>
{% else %}
<a href="{{ url_for('auth.login') }}">Login</a>
<a href="{{ url_for('auth.register') }}">Register</a>
{% endif %}
</nav>
<!-- Conditionally show delete button -->
{% if current_user.is_admin %}
<button class="btn btn-danger" onclick="deleteUser({{ user.id }})">
Delete User
</button>
{% endif %}
OAuth2 lets users log in with their existing accounts from providers like Google, GitHub, or Facebook. Instead of managing passwords yourself, you delegate authentication to a trusted provider. Flask-Dance makes this straightforward.
pip install flask-dance[sqla]
from flask import Flask, redirect, url_for
from flask_dance.contrib.google import make_google_blueprint, google
from flask_dance.contrib.github import make_github_blueprint, github
from flask_login import login_user, current_user
import os
app = Flask(__name__)
app.secret_key = os.environ["FLASK_SECRET_KEY"]
# Google OAuth blueprint
google_bp = make_google_blueprint(
client_id=os.environ["GOOGLE_CLIENT_ID"],
client_secret=os.environ["GOOGLE_CLIENT_SECRET"],
scope=["openid", "email", "profile"],
redirect_url="/auth/google/callback"
)
app.register_blueprint(google_bp, url_prefix="/auth/google")
# GitHub OAuth blueprint
github_bp = make_github_blueprint(
client_id=os.environ["GITHUB_CLIENT_ID"],
client_secret=os.environ["GITHUB_CLIENT_SECRET"],
scope="user:email",
)
app.register_blueprint(github_bp, url_prefix="/auth/github")
@app.route("/auth/google/callback")
def google_callback():
"""Handle Google OAuth callback."""
if not google.authorized:
return redirect(url_for("google.login"))
# Get user info from Google
resp = google.get("/oauth2/v2/userinfo")
if resp.ok:
google_info = resp.json()
email = google_info["email"]
name = google_info.get("name", "")
# Find or create user
user = User.query.filter_by(email=email).first()
if not user:
user = User(
username=email.split("@")[0],
email=email,
password_hash="", # No password for OAuth users
oauth_provider="google"
)
db.session.add(user)
db.session.commit()
login_user(user)
return redirect(url_for("main.dashboard"))
return "Failed to get user info from Google", 400
@app.route("/auth/github/callback")
def github_callback():
"""Handle GitHub OAuth callback."""
if not github.authorized:
return redirect(url_for("github.login"))
resp = github.get("/user")
if resp.ok:
github_info = resp.json()
github_id = str(github_info["id"])
username = github_info["login"]
email = github_info.get("email", f"{username}@github.user")
user = User.query.filter_by(email=email).first()
if not user:
user = User(
username=username,
email=email,
password_hash="",
oauth_provider="github"
)
db.session.add(user)
db.session.commit()
login_user(user)
return redirect(url_for("main.dashboard"))
return "Failed to get user info from GitHub", 400
The login template with social login buttons:
<div class="social-login">
<a href="{{ url_for('google.login') }}" class="btn btn-danger btn-block">
Login with Google
</a>
<a href="{{ url_for('github.login') }}" class="btn btn-dark btn-block">
Login with GitHub
</a>
<hr>
<p>Or login with your credentials:</p>
<form method="post">
<!-- regular login form -->
</form>
</div>
Let us build a complete authentication system that combines everything: user registration with validation, login/logout with Flask-Login, a protected dashboard, an admin panel with role checks, and a password reset flow.
flask_auth_app/ +-- app/ | +-- __init__.py # Application factory | +-- models.py # User and Role models | +-- auth/ | | +-- __init__.py # Auth blueprint | | +-- routes.py # Login, register, logout, reset | | +-- forms.py # WTForms for validation | | +-- utils.py # Email sending, token generation | +-- main/ | | +-- __init__.py # Main blueprint | | +-- routes.py # Dashboard, home | +-- admin/ | | +-- __init__.py # Admin blueprint | | +-- routes.py # Admin panel | +-- templates/ | +-- base.html | +-- auth/ | | +-- login.html | | +-- register.html | | +-- reset_password.html | +-- main/ | | +-- dashboard.html | +-- admin/ | +-- panel.html +-- config.py +-- requirements.txt +-- run.py
import os
from datetime import timedelta
class Config:
SECRET_KEY = os.environ.get("SECRET_KEY", "dev-secret-key")
SQLALCHEMY_DATABASE_URI = os.environ.get(
"DATABASE_URL", "sqlite:///app.db"
)
SQLALCHEMY_TRACK_MODIFICATIONS = False
# Session
PERMANENT_SESSION_LIFETIME = timedelta(hours=2)
SESSION_COOKIE_SECURE = os.environ.get("FLASK_ENV") == "production"
SESSION_COOKIE_HTTPONLY = True
SESSION_COOKIE_SAMESITE = "Lax"
# Remember me
REMEMBER_COOKIE_DURATION = timedelta(days=14)
REMEMBER_COOKIE_SECURE = os.environ.get("FLASK_ENV") == "production"
# Password reset tokens
RESET_TOKEN_EXPIRY = 3600 # 1 hour in seconds
from datetime import datetime
from werkzeug.security import generate_password_hash, check_password_hash
from flask_login import UserMixin
from itsdangerous import URLSafeTimedSerializer
from flask import current_app
from app import db
user_roles = db.Table("user_roles",
db.Column("user_id", db.Integer, db.ForeignKey("users.id"), primary_key=True),
db.Column("role_id", db.Integer, db.ForeignKey("roles.id"), primary_key=True)
)
class Role(db.Model):
__tablename__ = "roles"
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(50), unique=True, nullable=False)
@staticmethod
def insert_roles():
"""Seed default roles."""
roles = ["user", "editor", "admin"]
for role_name in roles:
if not Role.query.filter_by(name=role_name).first():
db.session.add(Role(name=role_name))
db.session.commit()
class User(UserMixin, db.Model):
__tablename__ = "users"
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False, index=True)
email = db.Column(db.String(120), unique=True, nullable=False, index=True)
password_hash = db.Column(db.String(256), nullable=False)
created_at = db.Column(db.DateTime, default=datetime.utcnow)
last_login = db.Column(db.DateTime)
is_active_account = db.Column(db.Boolean, default=True)
roles = db.relationship("Role", secondary=user_roles,
backref=db.backref("users", lazy="dynamic"))
def set_password(self, password):
self.password_hash = generate_password_hash(password)
def check_password(self, password):
return check_password_hash(self.password_hash, password)
def has_role(self, role_name):
return any(r.name == role_name for r in self.roles)
@property
def is_admin(self):
return self.has_role("admin")
def get_reset_token(self):
"""Generate a password reset token."""
s = URLSafeTimedSerializer(current_app.config["SECRET_KEY"])
return s.dumps({"user_id": self.id}, salt="password-reset")
@staticmethod
def verify_reset_token(token, max_age=3600):
"""Verify a password reset token."""
s = URLSafeTimedSerializer(current_app.config["SECRET_KEY"])
try:
data = s.loads(token, salt="password-reset", max_age=max_age)
return User.query.get(data["user_id"])
except Exception:
return None
def __repr__(self):
return f"<User {self.username}>"
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_login import LoginManager
from flask_wtf.csrf import CSRFProtect
from config import Config
db = SQLAlchemy()
login_manager = LoginManager()
csrf = CSRFProtect()
def create_app(config_class=Config):
app = Flask(__name__)
app.config.from_object(config_class)
# Initialize extensions
db.init_app(app)
login_manager.init_app(app)
csrf.init_app(app)
# Flask-Login configuration
login_manager.login_view = "auth.login"
login_manager.login_message_category = "warning"
# User loader
from app.models import User
@login_manager.user_loader
def load_user(user_id):
return User.query.get(int(user_id))
# Register blueprints
from app.auth import auth_bp
from app.main import main_bp
from app.admin import admin_bp
app.register_blueprint(auth_bp)
app.register_blueprint(main_bp)
app.register_blueprint(admin_bp, url_prefix="/admin")
# Create tables
with app.app_context():
db.create_all()
from app.models import Role
Role.insert_roles()
return app
from flask_wtf import FlaskForm
from wtforms import StringField, PasswordField, BooleanField, SubmitField
from wtforms.validators import (
DataRequired, Email, EqualTo, Length, ValidationError, Regexp
)
from app.models import User
class RegistrationForm(FlaskForm):
username = StringField("Username", validators=[
DataRequired(),
Length(min=3, max=80),
Regexp(
r"^[a-zA-Z0-9_]+$",
message="Username can only contain letters, numbers, and underscores."
)
])
email = StringField("Email", validators=[
DataRequired(),
Email(),
Length(max=120)
])
password = PasswordField("Password", validators=[
DataRequired(),
Length(min=8, message="Password must be at least 8 characters.")
])
confirm_password = PasswordField("Confirm Password", validators=[
DataRequired(),
EqualTo("password", message="Passwords must match.")
])
submit = SubmitField("Register")
def validate_username(self, field):
if User.query.filter_by(username=field.data).first():
raise ValidationError("Username is already taken.")
def validate_email(self, field):
if User.query.filter_by(email=field.data).first():
raise ValidationError("Email is already registered.")
class LoginForm(FlaskForm):
username = StringField("Username", validators=[DataRequired()])
password = PasswordField("Password", validators=[DataRequired()])
remember = BooleanField("Remember Me")
submit = SubmitField("Login")
class ResetRequestForm(FlaskForm):
email = StringField("Email", validators=[DataRequired(), Email()])
submit = SubmitField("Request Password Reset")
class ResetPasswordForm(FlaskForm):
password = PasswordField("New Password", validators=[
DataRequired(),
Length(min=8)
])
confirm_password = PasswordField("Confirm Password", validators=[
DataRequired(),
EqualTo("password")
])
submit = SubmitField("Reset Password")
from datetime import datetime
from flask import render_template, redirect, url_for, flash, request
from flask_login import login_user, logout_user, current_user, login_required
from app import db
from app.auth import auth_bp
from app.auth.forms import (
RegistrationForm, LoginForm, ResetRequestForm, ResetPasswordForm
)
from app.models import User, Role
@auth_bp.route("/register", methods=["GET", "POST"])
def register():
if current_user.is_authenticated:
return redirect(url_for("main.dashboard"))
form = RegistrationForm()
if form.validate_on_submit():
user = User(
username=form.username.data.strip(),
email=form.email.data.strip().lower()
)
user.set_password(form.password.data)
# Assign default role
default_role = Role.query.filter_by(name="user").first()
if default_role:
user.roles.append(default_role)
db.session.add(user)
db.session.commit()
flash("Account created successfully! Please log in.", "success")
return redirect(url_for("auth.login"))
return render_template("auth/register.html", form=form)
@auth_bp.route("/login", methods=["GET", "POST"])
def login():
if current_user.is_authenticated:
return redirect(url_for("main.dashboard"))
form = LoginForm()
if form.validate_on_submit():
user = User.query.filter_by(username=form.username.data.strip()).first()
if user and user.check_password(form.password.data):
if not user.is_active_account:
flash("Your account has been deactivated.", "danger")
return render_template("auth/login.html", form=form)
login_user(user, remember=form.remember.data)
user.last_login = datetime.utcnow()
db.session.commit()
next_page = request.args.get("next")
return redirect(next_page or url_for("main.dashboard"))
flash("Invalid username or password.", "danger")
return render_template("auth/login.html", form=form)
@auth_bp.route("/logout")
@login_required
def logout():
logout_user()
flash("You have been logged out.", "info")
return redirect(url_for("auth.login"))
@auth_bp.route("/reset-password", methods=["GET", "POST"])
def reset_request():
if current_user.is_authenticated:
return redirect(url_for("main.dashboard"))
form = ResetRequestForm()
if form.validate_on_submit():
user = User.query.filter_by(email=form.email.data.strip().lower()).first()
if user:
token = user.get_reset_token()
# In production, send this via email
# send_reset_email(user, token)
flash(
"If that email exists, a reset link has been sent.",
"info"
)
else:
# Do not reveal whether email exists
flash("If that email exists, a reset link has been sent.", "info")
return redirect(url_for("auth.login"))
return render_template("auth/reset_request.html", form=form)
@auth_bp.route("/reset-password/<token>", methods=["GET", "POST"])
def reset_password(token):
if current_user.is_authenticated:
return redirect(url_for("main.dashboard"))
user = User.verify_reset_token(token)
if not user:
flash("Invalid or expired reset token.", "danger")
return redirect(url_for("auth.reset_request"))
form = ResetPasswordForm()
if form.validate_on_submit():
user.set_password(form.password.data)
db.session.commit()
flash("Your password has been reset. Please log in.", "success")
return redirect(url_for("auth.login"))
return render_template("auth/reset_password.html", form=form)
from flask import render_template
from flask_login import login_required, current_user
from app.main import main_bp
@main_bp.route("/")
def home():
return render_template("main/home.html")
@main_bp.route("/dashboard")
@login_required
def dashboard():
return render_template("main/dashboard.html", user=current_user)
from flask import render_template, redirect, url_for, flash, request, abort
from flask_login import login_required, current_user
from functools import wraps
from app import db
from app.admin import admin_bp
from app.models import User, Role
def admin_required(f):
@wraps(f)
@login_required
def decorated(*args, **kwargs):
if not current_user.is_admin:
abort(403)
return f(*args, **kwargs)
return decorated
@admin_bp.route("/")
@admin_required
def panel():
users = User.query.order_by(User.created_at.desc()).all()
return render_template("admin/panel.html", users=users)
@admin_bp.route("/user/<int:user_id>/toggle-active", methods=["POST"])
@admin_required
def toggle_user_active(user_id):
user = User.query.get_or_404(user_id)
if user.id == current_user.id:
flash("You cannot deactivate your own account.", "warning")
return redirect(url_for("admin.panel"))
user.is_active_account = not user.is_active_account
db.session.commit()
status = "activated" if user.is_active_account else "deactivated"
flash(f"User {user.username} has been {status}.", "success")
return redirect(url_for("admin.panel"))
@admin_bp.route("/user/<int:user_id>/change-role", methods=["POST"])
@admin_required
def change_user_role(user_id):
user = User.query.get_or_404(user_id)
new_role_name = request.form.get("role")
role = Role.query.filter_by(name=new_role_name).first()
if not role:
flash("Invalid role.", "danger")
return redirect(url_for("admin.panel"))
user.roles = [role]
db.session.commit()
flash(f"User {user.username} role changed to {new_role_name}.", "success")
return redirect(url_for("admin.panel"))
The password reset flow uses signed tokens (via itsdangerous) to verify that the reset request is legitimate.
# How the password reset flow works:
# 1. User requests a password reset by providing their email
# 2. Server generates a signed, time-limited token containing the user ID
# 3. Token is sent to user's email as a link (e.g., /reset-password/TOKEN)
# 4. User clicks the link, server verifies the token
# 5. If valid and not expired, user sets a new password
# 6. Old sessions are invalidated
# Token generation (in the User model):
from itsdangerous import URLSafeTimedSerializer
def get_reset_token(self):
s = URLSafeTimedSerializer(current_app.config["SECRET_KEY"])
return s.dumps({"user_id": self.id}, salt="password-reset")
# Token verification:
@staticmethod
def verify_reset_token(token, max_age=3600):
s = URLSafeTimedSerializer(current_app.config["SECRET_KEY"])
try:
data = s.loads(token, salt="password-reset", max_age=max_age)
return User.query.get(data["user_id"])
except Exception:
return None
In production, you would send the reset email using Flask-Mail:
from flask_mail import Mail, Message
mail = Mail(app)
def send_reset_email(user):
token = user.get_reset_token()
msg = Message(
subject="Password Reset Request",
sender="noreply@yourapp.com",
recipients=[user.email]
)
msg.body = f"""To reset your password, visit the following link:
{url_for('auth.reset_password', token=token, _external=True)}
If you did not request this, ignore this email.
This link expires in 1 hour.
"""
mail.send(msg)
Cross-Site Request Forgery (CSRF) attacks trick authenticated users into making unintended requests. Flask-WTF provides built-in CSRF protection.
from flask_wtf.csrf import CSRFProtect
csrf = CSRFProtect(app)
# In templates, include the CSRF token in forms:
# {{ form.hidden_tag() }} for Flask-WTF forms
# OR manually:
# <input type="hidden" name="csrf_token" value="{{ csrf_token() }}">
# For AJAX requests, include the token in headers:
# X-CSRFToken: {{ csrf_token() }}
from flask_talisman import Talisman
# Force HTTPS and set security headers
talisman = Talisman(
app,
force_https=True,
strict_transport_security=True,
session_cookie_secure=True,
content_security_policy={
"default-src": "'self'",
"script-src": "'self'",
"style-src": "'self' 'unsafe-inline'",
}
)
from flask_limiter import Limiter
from flask_limiter.util import get_remote_address
limiter = Limiter(
app=app,
key_func=get_remote_address,
default_limits=["200 per day", "50 per hour"]
)
@app.route("/login", methods=["POST"])
@limiter.limit("5 per minute") # Max 5 login attempts per minute
def login():
# ... login logic ...
pass
@app.route("/api/auth/login", methods=["POST"])
@limiter.limit("10 per minute")
def api_login():
# ... API login logic ...
pass
# Production cookie configuration checklist
app.config.update(
SESSION_COOKIE_SECURE=True, # HTTPS only
SESSION_COOKIE_HTTPONLY=True, # No JavaScript access
SESSION_COOKIE_SAMESITE="Lax", # CSRF protection
REMEMBER_COOKIE_SECURE=True, # HTTPS only for remember me
REMEMBER_COOKIE_HTTPONLY=True, # No JS access to remember cookie
)
import bleach
def sanitize_input(value):
"""Remove HTML tags and dangerous content from user input."""
if value is None:
return ""
return bleach.clean(value.strip())
# Usage in routes:
username = sanitize_input(request.form.get("username"))
email = sanitize_input(request.form.get("email"))
# SQLAlchemy parameterized queries prevent SQL injection automatically:
# SAFE:
user = User.query.filter_by(username=username).first()
# NEVER do this:
# db.session.execute(f"SELECT * FROM users WHERE username = '{username}'")
| Security Measure | Tool/Library | Why |
|---|---|---|
| Password hashing | werkzeug.security / bcrypt | Never store plaintext passwords |
| CSRF protection | Flask-WTF (CSRFProtect) | Prevent cross-site form submissions |
| HTTPS | Flask-Talisman | Encrypt data in transit |
| Secure cookies | Flask config | Prevent cookie theft |
| Rate limiting | Flask-Limiter | Prevent brute-force attacks |
| Input sanitization | bleach / WTForms validators | Prevent XSS and injection |
| SQL injection prevention | SQLAlchemy ORM | Parameterized queries |
| Token expiration | JWT / itsdangerous | Limit exposure window |
| Security headers | Flask-Talisman | Browser security policies |
# WRONG: Never do this user.password = request.form["password"] # CORRECT: Always hash passwords user.password_hash = generate_password_hash(request.form["password"])
Session fixation attacks occur when the session ID is not regenerated after login. An attacker sets a known session ID, waits for the victim to log in, and then hijacks the session.
# WRONG: Reusing the same session after login
session["user_id"] = user.id
# CORRECT: Regenerate the session on login
from flask import session
@app.route("/login", methods=["POST"])
def login():
# ... validate credentials ...
session.clear() # Clear old session data
session["user_id"] = user.id # Start fresh session
# Flask-Login's login_user() handles this correctly
# WRONG: Form without CSRF token
# <form method="post">
# <input name="amount">
# <button>Transfer</button>
# </form>
# CORRECT: Include CSRF token
# <form method="post">
# {{ form.hidden_tag() }}
# <input name="amount">
# <button>Transfer</button>
# </form>
# WRONG: Decoding without verification
import jwt
data = jwt.decode(token, options={"verify_signature": False})
# CORRECT: Always verify the signature
data = jwt.decode(token, app.config["SECRET_KEY"], algorithms=["HS256"])
# WRONG: Revealing whether a username/email exists
if not user:
flash("No account with that username.") # Tells attacker the username does not exist
elif not user.check_password(password):
flash("Wrong password.") # Tells attacker the username exists
# CORRECT: Generic error message
if not user or not user.check_password(password):
flash("Invalid credentials.") # Attacker learns nothing
# WRONG: Tokens that never expire and cannot be revoked
access_token = create_access_token(identity=user.id, expires_delta=False)
# CORRECT: Short-lived tokens with refresh mechanism and blocklist
access_token = create_access_token(
identity=str(user.id),
expires_delta=timedelta(hours=1)
)
refresh_token = create_refresh_token(
identity=str(user.id),
expires_delta=timedelta(days=30)
)
| # | Concept | Key Point |
|---|---|---|
| 1 | Authentication vs Authorization | Authentication verifies identity; authorization checks permissions. Always authenticate first. |
| 2 | Password Hashing | Never store plaintext passwords. Use generate_password_hash (Werkzeug) or bcrypt. |
| 3 | Sessions | Flask sessions are signed but not encrypted. Use server-side sessions (Redis) for sensitive data. |
| 4 | Flask-Login | The standard for session-based auth in Flask. Use UserMixin, login_user(), @login_required. |
| 5 | JWT Tokens | Use for APIs, SPAs, and mobile apps. Short-lived access tokens + long-lived refresh tokens. |
| 6 | Token Revocation | JWTs are stateless: use a Redis blocklist to revoke tokens on logout or security events. |
| 7 | RBAC | Use many-to-many user-role relationships with custom decorators like @role_required. |
| 8 | OAuth2 | Flask-Dance simplifies social login (Google, GitHub). Delegate authentication to trusted providers. |
| 9 | CSRF Protection | Always use Flask-WTF’s CSRF protection on forms. Include tokens in AJAX requests. |
| 10 | Defense in Depth | Combine HTTPS, secure cookies, rate limiting, input validation, and parameterized queries. |
Authentication and authorization are foundational to any web application. Start with Flask-Login for session-based auth in server-rendered apps, and Flask-JWT-Extended for APIs. Layer on RBAC when your application needs fine-grained permissions. Always hash passwords, always use CSRF protection, and always serve over HTTPS in production. These are not optional: they are the minimum baseline for any application that handles user data.
Every non-trivial web application needs persistent storage. Whether you are building a REST API, an admin dashboard, or a SaaS product, the database layer is the backbone that holds your application state, user data, and business logic together. Flask, being a micro-framework, does not ship with a built-in ORM or database abstraction. This is intentional — it lets you choose the right tool for your domain instead of forcing a one-size-fits-all solution.
In practice, the Python ecosystem has converged on SQLAlchemy as the de facto ORM for Flask applications, and Flask-SQLAlchemy as the integration layer that wires SQLAlchemy into the Flask application lifecycle. This tutorial covers the full spectrum of database integration — from defining your first model to managing migrations in production, handling transactions safely, and optimizing query performance under real load.
We will also discuss when an ORM is the wrong choice and you should drop down to raw SQL. The goal is not to make you a SQLAlchemy expert overnight, but to give you the mental model and working patterns you need to build production-grade data layers in Flask.
Before we dive in, it is worth understanding the trade-off:
The pragmatic approach is to use the ORM for 90% of your operations (CRUD, relationships, standard queries) and drop to raw SQL for the remaining 10% (complex reporting, bulk inserts, database-specific features). SQLAlchemy supports both seamlessly.
Flask-SQLAlchemy is a Flask extension that adds SQLAlchemy support with sensible defaults and useful helpers. Install it along with the database driver you need:
# Core package pip install Flask-SQLAlchemy # Database drivers (install the one you need) pip install psycopg2-binary # PostgreSQL pip install PyMySQL # MySQL pip install mysqlclient # MySQL (C extension, faster) # SQLite uses Python's built-in sqlite3 module — no extra install needed
The most important configuration key is SQLALCHEMY_DATABASE_URI, which tells SQLAlchemy how to connect to your database. Here is a minimal setup:
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
def create_app():
app = Flask(__name__)
# SQLite (file-based, good for development)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///app.db'
# Disable modification tracking (saves memory)
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db.init_app(app)
return app
The URI format follows the pattern: dialect+driver://username:password@host:port/database
# SQLite — relative path (three slashes = relative to instance folder) SQLALCHEMY_DATABASE_URI = 'sqlite:///app.db' # SQLite — absolute path (four slashes) SQLALCHEMY_DATABASE_URI = 'sqlite:////var/data/app.db' # PostgreSQL SQLALCHEMY_DATABASE_URI = 'postgresql://user:password@localhost:5432/mydb' # PostgreSQL with psycopg2 driver explicitly SQLALCHEMY_DATABASE_URI = 'postgresql+psycopg2://user:password@localhost:5432/mydb' # MySQL with PyMySQL driver SQLALCHEMY_DATABASE_URI = 'mysql+pymysql://user:password@localhost:3306/mydb' # MySQL with mysqlclient driver SQLALCHEMY_DATABASE_URI = 'mysql+mysqldb://user:password@localhost:3306/mydb' # MySQL with charset specified SQLALCHEMY_DATABASE_URI = 'mysql+pymysql://user:password@localhost/mydb?charset=utf8mb4'
app.config['SQLALCHEMY_DATABASE_URI'] = 'postgresql://user:pass@localhost/mydb' app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False app.config['SQLALCHEMY_ECHO'] = True # Log all SQL statements (dev only) app.config['SQLALCHEMY_POOL_SIZE'] = 10 # Connection pool size app.config['SQLALCHEMY_POOL_RECYCLE'] = 3600 # Recycle connections after 1 hour app.config['SQLALCHEMY_MAX_OVERFLOW'] = 20 # Extra connections beyond pool_size app.config['SQLALCHEMY_POOL_TIMEOUT'] = 30 # Seconds to wait for a connection
A model is a Python class that maps to a database table. Each class attribute maps to a column. Flask-SQLAlchemy provides db.Model as the base class for all your models.
from datetime import datetime, timezone
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
class User(db.Model):
__tablename__ = 'users' # Explicit table name (optional, defaults to class name lowercase)
id = db.Column(db.Integer, primary_key=True, autoincrement=True)
username = db.Column(db.String(80), unique=True, nullable=False, index=True)
email = db.Column(db.String(120), unique=True, nullable=False)
password_hash = db.Column(db.String(256), nullable=False)
is_active = db.Column(db.Boolean, default=True, nullable=False)
role = db.Column(db.String(20), default='user', nullable=False)
bio = db.Column(db.Text, nullable=True)
created_at = db.Column(db.DateTime, default=lambda: datetime.now(timezone.utc), nullable=False)
updated_at = db.Column(
db.DateTime,
default=lambda: datetime.now(timezone.utc),
onupdate=lambda: datetime.now(timezone.utc),
nullable=False
)
def __repr__(self):
return f'<User {self.username}>'
| SQLAlchemy Type | Python Type | SQL Equivalent |
|---|---|---|
db.Integer |
int | INTEGER |
db.BigInteger |
int | BIGINT |
db.String(n) |
str | VARCHAR(n) |
db.Text |
str | TEXT |
db.Float |
float | FLOAT |
db.Numeric(10, 2) |
Decimal | NUMERIC(10, 2) |
db.Boolean |
bool | BOOLEAN |
db.DateTime |
datetime | DATETIME |
db.Date |
date | DATE |
db.Time |
time | TIME |
db.LargeBinary |
bytes | BLOB |
db.JSON |
dict/list | JSON |
db.Enum |
str/enum | ENUM |
# Primary key
id = db.Column(db.Integer, primary_key=True)
# Unique constraint
email = db.Column(db.String(120), unique=True)
# Not nullable (required field)
name = db.Column(db.String(80), nullable=False)
# Default value (Python-side)
role = db.Column(db.String(20), default='user')
# Server-side default
created_at = db.Column(db.DateTime, server_default=db.func.now())
# Index for faster queries
username = db.Column(db.String(80), index=True)
# Composite index
__table_args__ = (
db.Index('idx_user_email_role', 'email', 'role'),
db.UniqueConstraint('first_name', 'last_name', name='uq_full_name'),
)
Relationships define how models connect to each other. SQLAlchemy supports one-to-many, many-to-one, one-to-one, and many-to-many relationships. Getting these right is critical — bad relationship design leads to N+1 queries and painful refactors later.
The most common relationship. One user has many posts. The foreign key lives on the “many” side.
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
# One user has many posts
posts = db.relationship('Post', back_populates='author', lazy='select')
def __repr__(self):
return f'<User {self.username}>'
class Post(db.Model):
__tablename__ = 'posts'
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(200), nullable=False)
body = db.Column(db.Text, nullable=False)
# Foreign key to users table
author_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False)
# Many posts belong to one user
author = db.relationship('User', back_populates='posts')
def __repr__(self):
return f'<Post {self.title}>'
Many-to-many relationships require an association table. For example, posts can have many tags, and tags can belong to many posts.
# Association table (no model class needed for simple many-to-many)
post_tags = db.Table('post_tags',
db.Column('post_id', db.Integer, db.ForeignKey('posts.id'), primary_key=True),
db.Column('tag_id', db.Integer, db.ForeignKey('tags.id'), primary_key=True)
)
class Post(db.Model):
__tablename__ = 'posts'
id = db.Column(db.Integer, primary_key=True)
title = db.Column(db.String(200), nullable=False)
# Many-to-many with tags
tags = db.relationship('Tag', secondary=post_tags, back_populates='posts', lazy='select')
class Tag(db.Model):
__tablename__ = 'tags'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(50), unique=True, nullable=False)
# Many-to-many with posts
posts = db.relationship('Post', secondary=post_tags, back_populates='tags', lazy='select')
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
# One-to-one: uselist=False means this returns a single object, not a list
profile = db.relationship('Profile', back_populates='user', uselist=False)
class Profile(db.Model):
__tablename__ = 'profiles'
id = db.Column(db.Integer, primary_key=True)
bio = db.Column(db.Text)
avatar_url = db.Column(db.String(500))
user_id = db.Column(db.Integer, db.ForeignKey('users.id'), unique=True, nullable=False)
user = db.relationship('User', back_populates='profile')
The lazy parameter controls when related objects are loaded from the database:
| Value | Behavior | Use When |
|---|---|---|
'select' (default) |
Loads related objects on first access via a separate SELECT | You access the relationship occasionally |
'joined' |
Loads via JOIN in the same query | You always need the related data |
'subquery' |
Loads via a subquery after the initial query | One-to-many where JOIN would duplicate rows |
'dynamic' |
Returns a query object instead of loading results | Large collections you want to filter further |
'selectin' |
Loads via SELECT … WHERE id IN (…) | Best default for most one-to-many relationships |
Once your models are defined, you need to create the actual database tables. The simplest approach uses db.create_all().
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
def create_app():
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///app.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db.init_app(app)
with app.app_context():
# Import models so SQLAlchemy knows about them
from . import models
db.create_all()
return app
For production applications, use the application factory pattern. This separates the creation of the db object from the app, allowing you to create multiple app instances (for testing, different configs, etc.).
# extensions.py
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
# models.py
from extensions import db
from datetime import datetime, timezone
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
created_at = db.Column(db.DateTime, default=lambda: datetime.now(timezone.utc))
# app.py
from flask import Flask
from extensions import db
def create_app(config_name='development'):
app = Flask(__name__)
# Load config based on environment
if config_name == 'development':
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///dev.db'
app.config['SQLALCHEMY_ECHO'] = True
elif config_name == 'testing':
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///:memory:'
elif config_name == 'production':
app.config['SQLALCHEMY_DATABASE_URI'] = os.environ.get('DATABASE_URL')
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
# Initialize extensions
db.init_app(app)
# Register blueprints, error handlers, etc.
# ...
return app
Important: db.create_all() only creates tables that do not already exist. It will not modify existing tables (add columns, change types, etc.). For schema changes on existing tables, you need migrations — covered in section 8.
CRUD stands for Create, Read, Update, Delete — the four fundamental operations on any data store. Here is how each works with Flask-SQLAlchemy.
# Create a single record
user = User(username='john_doe', email='john@example.com', password_hash='hashed_pw')
db.session.add(user)
db.session.commit()
# The user now has an id assigned by the database
print(user.id) # e.g., 1
# Create multiple records at once
users = [
User(username='alice', email='alice@example.com', password_hash='hash1'),
User(username='bob', email='bob@example.com', password_hash='hash2'),
User(username='charlie', email='charlie@example.com', password_hash='hash3'),
]
db.session.add_all(users)
db.session.commit()
# Get by primary key
user = db.session.get(User, 1) # Returns None if not found
# Get all records
all_users = User.query.all()
# Get first matching record
user = User.query.filter_by(username='john_doe').first()
# Get first or return 404 (useful in route handlers)
user = User.query.filter_by(username='john_doe').first_or_404(
description='User not found'
)
# Get by primary key or 404
user = db.session.get(User, 1) or abort(404)
# Method 1: Modify the object and commit
user = User.query.filter_by(username='john_doe').first()
if user:
user.email = 'newemail@example.com'
user.role = 'admin'
db.session.commit()
# Method 2: Bulk update (more efficient for many records)
User.query.filter(User.role == 'user').update({'is_active': False})
db.session.commit()
# Method 3: Update with returning the count of affected rows
count = User.query.filter(User.last_login < cutoff_date).update(
{'is_active': False},
synchronize_session='fetch'
)
db.session.commit()
print(f'Deactivated {count} users')
# Delete a single record
user = User.query.filter_by(username='john_doe').first()
if user:
db.session.delete(user)
db.session.commit()
# Bulk delete
deleted_count = User.query.filter(User.is_active == False).delete()
db.session.commit()
print(f'Deleted {deleted_count} inactive users')
SQLAlchemy's query interface is expressive and composable. You can chain methods to build complex queries without writing raw SQL.
# filter_by — simple equality checks using keyword arguments
users = User.query.filter_by(role='admin', is_active=True).all()
# filter — more powerful, supports operators
users = User.query.filter(User.age >= 18).all()
users = User.query.filter(User.username.like('%john%')).all()
users = User.query.filter(User.email.endswith('@example.com')).all()
users = User.query.filter(User.role.in_(['admin', 'moderator'])).all()
users = User.query.filter(User.bio.isnot(None)).all()
# Combine multiple filters (AND)
users = User.query.filter(
User.role == 'admin',
User.is_active == True,
User.created_at >= start_date
).all()
# OR conditions
from sqlalchemy import or_
users = User.query.filter(
or_(User.role == 'admin', User.role == 'moderator')
).all()
# NOT conditions
from sqlalchemy import not_
users = User.query.filter(not_(User.is_active)).all()
# Order by
users = User.query.order_by(User.created_at.desc()).all()
users = User.query.order_by(User.last_name.asc(), User.first_name.asc()).all()
# Limit and offset
users = User.query.order_by(User.id).limit(10).offset(20).all()
# First result
user = User.query.order_by(User.created_at.desc()).first()
# Count
active_count = User.query.filter_by(is_active=True).count()
# Pagination (Flask-SQLAlchemy built-in)
page = request.args.get('page', 1, type=int)
per_page = request.args.get('per_page', 20, type=int)
pagination = User.query.order_by(User.created_at.desc()).paginate(
page=page,
per_page=per_page,
error_out=False # Return empty page instead of 404
)
# Pagination object properties
items = pagination.items # List of items on current page
total = pagination.total # Total number of items
pages = pagination.pages # Total number of pages
has_next = pagination.has_next
has_prev = pagination.has_prev
next_num = pagination.next_num
prev_num = pagination.prev_num
from sqlalchemy import func
# Count
total = db.session.query(func.count(User.id)).scalar()
# Sum
total_revenue = db.session.query(func.sum(Order.total_amount)).scalar()
# Average
avg_age = db.session.query(func.avg(User.age)).scalar()
# Min / Max
oldest = db.session.query(func.min(User.created_at)).scalar()
newest = db.session.query(func.max(User.created_at)).scalar()
# Group by
role_counts = db.session.query(
User.role,
func.count(User.id).label('count')
).group_by(User.role).all()
for role, count in role_counts:
print(f'{role}: {count}')
# Group by with having
popular_roles = db.session.query(
User.role,
func.count(User.id).label('count')
).group_by(User.role).having(func.count(User.id) > 5).all()
In production, you cannot use db.create_all() to evolve your schema. It does not alter existing tables — it only creates missing ones. Database migrations track every schema change as a versioned script that can be applied (upgrade) or reversed (downgrade). Flask-Migrate wraps Alembic, the migration tool for SQLAlchemy.
pip install Flask-Migrate
from flask import Flask
from flask_sqlalchemy import SQLAlchemy
from flask_migrate import Migrate
db = SQLAlchemy()
migrate = Migrate()
def create_app():
app = Flask(__name__)
app.config['SQLALCHEMY_DATABASE_URI'] = 'sqlite:///app.db'
app.config['SQLALCHEMY_TRACK_MODIFICATIONS'] = False
db.init_app(app)
migrate.init_app(app, db)
return app
# Initialize the migrations directory (run once) flask db init # Generate a migration script after changing models flask db migrate -m "Add users table" # Apply the migration to the database flask db upgrade # Revert the last migration flask db downgrade # Show current migration version flask db current # Show migration history flask db history # Upgrade to a specific version flask db upgrade ae1027a6acf # Downgrade to a specific version flask db downgrade ae1027a6acf
# 1. Make changes to your models (add column, new table, etc.) # 2. Generate migration flask db migrate -m "Add phone_number column to users" # 3. Review the generated migration file in migrations/versions/ # 4. Apply flask db upgrade # 5. Commit migration file to version control git add migrations/ git commit -m "Add phone_number column migration"
Critical rule: Always review auto-generated migration files before applying them. Alembic does its best to detect changes, but it can miss things like column renames (it sees a drop + add instead), data migrations, or index changes. Edit the generated file if needed.
"""Add phone_number column to users
Revision ID: a1b2c3d4e5f6
Revises: 9z8y7x6w5v4u
Create Date: 2026-02-26 10:30:00.000000
"""
from alembic import op
import sqlalchemy as sa
# revision identifiers
revision = 'a1b2c3d4e5f6'
down_revision = '9z8y7x6w5v4u'
branch_labels = None
depends_on = None
def upgrade():
op.add_column('users', sa.Column('phone_number', sa.String(20), nullable=True))
op.create_index('idx_users_phone', 'users', ['phone_number'])
def downgrade():
op.drop_index('idx_users_phone', table_name='users')
op.drop_column('users', 'phone_number')
Seeding populates your database with initial or test data. Flask's CLI makes it easy to create custom commands for this.
import click
from flask.cli import with_appcontext
from extensions import db
from models import User, Product, Category
@click.command('seed-db')
@with_appcontext
def seed_db_command():
"""Seed the database with sample data."""
# Clear existing data
db.session.execute(db.text('DELETE FROM users'))
db.session.execute(db.text('DELETE FROM products'))
db.session.execute(db.text('DELETE FROM categories'))
# Seed categories
categories = [
Category(name='Electronics', description='Gadgets and devices'),
Category(name='Clothing', description='Apparel and accessories'),
Category(name='Books', description='Physical and digital books'),
]
db.session.add_all(categories)
db.session.flush() # Flush to get IDs without committing
# Seed users
users = [
User(username='admin', email='admin@example.com', password_hash='hashed_admin', role='admin'),
User(username='alice', email='alice@example.com', password_hash='hashed_alice'),
User(username='bob', email='bob@example.com', password_hash='hashed_bob'),
]
db.session.add_all(users)
# Seed products
products = [
Product(name='Laptop', price=999.99, category_id=categories[0].id, stock=50),
Product(name='T-Shirt', price=19.99, category_id=categories[1].id, stock=200),
Product(name='Python Cookbook', price=39.99, category_id=categories[2].id, stock=100),
]
db.session.add_all(products)
db.session.commit()
click.echo('Database seeded successfully.')
# Register the command in your app factory
def create_app():
app = Flask(__name__)
# ... config, extensions ...
app.cli.add_command(seed_db_command)
return app
# Run the seed command flask seed-db
from faker import Faker
fake = Faker()
@click.command('seed-fake')
@click.argument('count', default=50)
@with_appcontext
def seed_fake_command(count):
"""Generate fake users for development."""
users = []
for _ in range(count):
users.append(User(
username=fake.unique.user_name(),
email=fake.unique.email(),
password_hash=fake.sha256(),
bio=fake.paragraph(nb_sentences=3),
is_active=fake.boolean(chance_of_getting_true=85),
created_at=fake.date_time_between(start_date='-1y', end_date='now')
))
db.session.add_all(users)
db.session.commit()
click.echo(f'Created {count} fake users.')
Database connections are expensive to create. Connection pooling keeps a set of connections open and reuses them across requests. SQLAlchemy handles this automatically, but you should tune the settings for your workload.
app.config['SQLALCHEMY_ENGINE_OPTIONS'] = {
'pool_size': 10, # Number of permanent connections to keep
'max_overflow': 20, # Extra connections allowed beyond pool_size
'pool_timeout': 30, # Seconds to wait for a connection from the pool
'pool_recycle': 1800, # Recycle connections after 30 minutes
'pool_pre_ping': True, # Test connections before using them (handles stale connections)
}
pool_size connections are in use, SQLAlchemy can create up to max_overflow additional temporary connections. These are closed when returned to the pool. Default is 10.wait_timeout (default 8 hours). Set this lower than your database's idle timeout.SELECT 1 before using a connection. Catches dead connections without the application seeing an error. Small overhead but highly recommended for production.# For a typical web app handling ~50 concurrent requests
app.config['SQLALCHEMY_ENGINE_OPTIONS'] = {
'pool_size': 20,
'max_overflow': 30,
'pool_timeout': 30,
'pool_recycle': 1800,
'pool_pre_ping': True,
}
# For a lightweight app or development
app.config['SQLALCHEMY_ENGINE_OPTIONS'] = {
'pool_size': 5,
'max_overflow': 10,
'pool_recycle': 3600,
'pool_pre_ping': True,
}
# To disable pooling entirely (useful for debugging)
from sqlalchemy.pool import NullPool
app.config['SQLALCHEMY_ENGINE_OPTIONS'] = {
'poolclass': NullPool,
}
A transaction groups multiple database operations into a single atomic unit — either all of them succeed, or none of them do. SQLAlchemy uses transactions implicitly (every operation between commit() calls is a transaction), but understanding explicit transaction control is essential for correctness.
# Implicit transaction — the most common pattern
try:
user = User(username='alice', email='alice@example.com', password_hash='hash')
db.session.add(user)
profile = Profile(user=user, bio='Software engineer')
db.session.add(profile)
db.session.commit() # Both user and profile are saved atomically
except Exception as e:
db.session.rollback() # Undo everything if any operation fails
raise e
from contextlib import contextmanager
@contextmanager
def transaction():
"""Context manager for database transactions."""
try:
yield db.session
db.session.commit()
except Exception:
db.session.rollback()
raise
# Usage
with transaction() as session:
user = User(username='bob', email='bob@example.com', password_hash='hash')
session.add(user)
profile = Profile(user=user, bio='Data scientist')
session.add(profile)
# Commits automatically on exit, rolls back on exception
def place_order(user_id, items):
"""Place an order with savepoints for partial rollback."""
try:
order = Order(user_id=user_id, status='pending')
db.session.add(order)
db.session.flush() # Get the order ID
for item in items:
# Savepoint for each item — if one fails, we can skip it
savepoint = db.session.begin_nested()
try:
product = db.session.get(Product, item['product_id'])
if product.stock < item['quantity']:
raise ValueError(f'Insufficient stock for {product.name}')
product.stock -= item['quantity']
order_item = OrderItem(
order_id=order.id,
product_id=product.id,
quantity=item['quantity'],
unit_price=product.price
)
db.session.add(order_item)
savepoint.commit()
except Exception as e:
savepoint.rollback()
print(f'Skipping item: {e}')
order.total_amount = sum(
oi.quantity * oi.unit_price for oi in order.items
)
db.session.commit()
return order
except Exception as e:
db.session.rollback()
raise e
from sqlalchemy.exc import IntegrityError, OperationalError
def create_user(username, email, password_hash):
"""Create a user with proper error handling."""
try:
user = User(
username=username,
email=email,
password_hash=password_hash
)
db.session.add(user)
db.session.commit()
return user, None
except IntegrityError as e:
db.session.rollback()
if 'username' in str(e.orig):
return None, 'Username already exists'
if 'email' in str(e.orig):
return None, 'Email already exists'
return None, 'Duplicate entry'
except OperationalError as e:
db.session.rollback()
return None, 'Database connection error'
except Exception as e:
db.session.rollback()
return None, f'Unexpected error: {str(e)}'
Let us build a complete e-commerce data layer that ties together everything covered so far. This example includes four interconnected models, full CRUD operations, complex queries, and transactional order placement.
from datetime import datetime, timezone
from extensions import db
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False, index=True)
email = db.Column(db.String(120), unique=True, nullable=False)
password_hash = db.Column(db.String(256), nullable=False)
is_active = db.Column(db.Boolean, default=True, nullable=False)
created_at = db.Column(db.DateTime, default=lambda: datetime.now(timezone.utc))
# Relationships
orders = db.relationship('Order', back_populates='user', lazy='selectin')
def __repr__(self):
return f'<User {self.username}>'
def to_dict(self):
return {
'id': self.id,
'username': self.username,
'email': self.email,
'is_active': self.is_active,
'created_at': self.created_at.isoformat(),
'order_count': len(self.orders)
}
class Product(db.Model):
__tablename__ = 'products'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(200), nullable=False, index=True)
description = db.Column(db.Text)
price = db.Column(db.Numeric(10, 2), nullable=False)
stock = db.Column(db.Integer, default=0, nullable=False)
category = db.Column(db.String(50), nullable=False, index=True)
is_available = db.Column(db.Boolean, default=True, nullable=False)
created_at = db.Column(db.DateTime, default=lambda: datetime.now(timezone.utc))
# Relationships
order_items = db.relationship('OrderItem', back_populates='product', lazy='select')
__table_args__ = (
db.Index('idx_product_category_price', 'category', 'price'),
db.CheckConstraint('price > 0', name='ck_positive_price'),
db.CheckConstraint('stock >= 0', name='ck_non_negative_stock'),
)
def __repr__(self):
return f'<Product {self.name} ${self.price}>'
def to_dict(self):
return {
'id': self.id,
'name': self.name,
'description': self.description,
'price': float(self.price),
'stock': self.stock,
'category': self.category,
'is_available': self.is_available
}
class Order(db.Model):
__tablename__ = 'orders'
id = db.Column(db.Integer, primary_key=True)
user_id = db.Column(db.Integer, db.ForeignKey('users.id'), nullable=False, index=True)
status = db.Column(db.String(20), default='pending', nullable=False)
total_amount = db.Column(db.Numeric(12, 2), default=0)
shipping_address = db.Column(db.Text)
created_at = db.Column(db.DateTime, default=lambda: datetime.now(timezone.utc))
updated_at = db.Column(
db.DateTime,
default=lambda: datetime.now(timezone.utc),
onupdate=lambda: datetime.now(timezone.utc)
)
# Relationships
user = db.relationship('User', back_populates='orders')
items = db.relationship('OrderItem', back_populates='order', lazy='selectin',
cascade='all, delete-orphan')
__table_args__ = (
db.Index('idx_order_user_status', 'user_id', 'status'),
)
def __repr__(self):
return f'<Order #{self.id} - {self.status}>'
def to_dict(self):
return {
'id': self.id,
'user_id': self.user_id,
'status': self.status,
'total_amount': float(self.total_amount),
'items': [item.to_dict() for item in self.items],
'created_at': self.created_at.isoformat()
}
class OrderItem(db.Model):
__tablename__ = 'order_items'
id = db.Column(db.Integer, primary_key=True)
order_id = db.Column(db.Integer, db.ForeignKey('orders.id'), nullable=False)
product_id = db.Column(db.Integer, db.ForeignKey('products.id'), nullable=False)
quantity = db.Column(db.Integer, nullable=False)
unit_price = db.Column(db.Numeric(10, 2), nullable=False)
# Relationships
order = db.relationship('Order', back_populates='items')
product = db.relationship('Product', back_populates='order_items')
__table_args__ = (
db.CheckConstraint('quantity > 0', name='ck_positive_quantity'),
)
@property
def subtotal(self):
return float(self.quantity * self.unit_price)
def __repr__(self):
return f'<OrderItem {self.product.name} x{self.quantity}>'
def to_dict(self):
return {
'id': self.id,
'product_id': self.product_id,
'product_name': self.product.name,
'quantity': self.quantity,
'unit_price': float(self.unit_price),
'subtotal': self.subtotal
}
# ---- USER CRUD ----
def create_user(username, email, password_hash):
"""Create a new user."""
user = User(username=username, email=email, password_hash=password_hash)
db.session.add(user)
db.session.commit()
return user
def get_user(user_id):
"""Get a user by ID."""
return db.session.get(User, user_id)
def update_user(user_id, **kwargs):
"""Update user fields."""
user = db.session.get(User, user_id)
if not user:
return None
for key, value in kwargs.items():
if hasattr(user, key):
setattr(user, key, value)
db.session.commit()
return user
def delete_user(user_id):
"""Soft-delete a user by deactivating."""
user = db.session.get(User, user_id)
if user:
user.is_active = False
db.session.commit()
return user
# ---- PRODUCT CRUD ----
def create_product(name, price, category, stock=0, description=None):
"""Create a new product."""
product = Product(
name=name, price=price, category=category,
stock=stock, description=description
)
db.session.add(product)
db.session.commit()
return product
def get_products_by_category(category, min_price=None, max_price=None):
"""Get products filtered by category and optional price range."""
query = Product.query.filter_by(category=category, is_available=True)
if min_price is not None:
query = query.filter(Product.price >= min_price)
if max_price is not None:
query = query.filter(Product.price <= max_price)
return query.order_by(Product.price.asc()).all()
def update_stock(product_id, quantity_change):
"""Adjust product stock. Use negative values for decrements."""
product = db.session.get(Product, product_id)
if product:
product.stock += quantity_change
if product.stock <= 0:
product.is_available = False
db.session.commit()
return product
from sqlalchemy import func, desc
def get_top_customers(limit=10):
"""Get customers with the highest total spend."""
results = db.session.query(
User.username,
User.email,
func.count(Order.id).label('order_count'),
func.sum(Order.total_amount).label('total_spent')
).join(Order, User.id == Order.user_id)\
.filter(Order.status == 'completed')\
.group_by(User.id)\
.order_by(desc('total_spent'))\
.limit(limit)\
.all()
return [
{
'username': r.username,
'email': r.email,
'order_count': r.order_count,
'total_spent': float(r.total_spent)
}
for r in results
]
def get_revenue_by_category():
"""Get total revenue grouped by product category."""
results = db.session.query(
Product.category,
func.sum(OrderItem.quantity * OrderItem.unit_price).label('revenue'),
func.sum(OrderItem.quantity).label('units_sold')
).join(OrderItem, Product.id == OrderItem.product_id)\
.join(Order, OrderItem.order_id == Order.id)\
.filter(Order.status == 'completed')\
.group_by(Product.category)\
.order_by(desc('revenue'))\
.all()
return [
{
'category': r.category,
'revenue': float(r.revenue),
'units_sold': r.units_sold
}
for r in results
]
def get_user_order_history(user_id, page=1, per_page=10):
"""Get paginated order history for a user with item details."""
return Order.query\
.filter_by(user_id=user_id)\
.order_by(Order.created_at.desc())\
.paginate(page=page, per_page=per_page, error_out=False)
def search_products(query_text, category=None, in_stock_only=True):
"""Full-text product search with filters."""
q = Product.query.filter(
Product.name.ilike(f'%{query_text}%')
)
if category:
q = q.filter_by(category=category)
if in_stock_only:
q = q.filter(Product.stock > 0, Product.is_available == True)
return q.order_by(Product.name).all()
from decimal import Decimal
from sqlalchemy.exc import IntegrityError
def place_order(user_id, cart_items, shipping_address):
"""
Place an order atomically.
Args:
user_id: ID of the user placing the order
cart_items: List of dicts with 'product_id' and 'quantity'
shipping_address: Shipping address string
Returns:
(Order, None) on success, (None, error_message) on failure
"""
try:
# Verify user exists and is active
user = db.session.get(User, user_id)
if not user or not user.is_active:
return None, 'Invalid or inactive user'
# Create the order
order = Order(
user_id=user_id,
status='pending',
shipping_address=shipping_address
)
db.session.add(order)
db.session.flush() # Get order.id without committing
total = Decimal('0.00')
for item in cart_items:
# Lock the product row to prevent race conditions
product = db.session.query(Product).filter_by(
id=item['product_id']
).with_for_update().first()
if not product:
db.session.rollback()
return None, f'Product {item["product_id"]} not found'
if not product.is_available:
db.session.rollback()
return None, f'{product.name} is no longer available'
if product.stock < item['quantity']:
db.session.rollback()
return None, f'Insufficient stock for {product.name} (available: {product.stock})'
# Deduct stock
product.stock -= item['quantity']
if product.stock == 0:
product.is_available = False
# Create order item
order_item = OrderItem(
order_id=order.id,
product_id=product.id,
quantity=item['quantity'],
unit_price=product.price
)
db.session.add(order_item)
total += product.price * item['quantity']
order.total_amount = total
db.session.commit()
return order, None
except IntegrityError as e:
db.session.rollback()
return None, f'Data integrity error: {str(e.orig)}'
except Exception as e:
db.session.rollback()
return None, f'Order failed: {str(e)}'
@app.route('/api/orders', methods=['POST'])
def create_order():
data = request.get_json()
order, error = place_order(
user_id=data['user_id'],
cart_items=data['items'], # [{'product_id': 1, 'quantity': 2}, ...]
shipping_address=data['shipping_address']
)
if error:
return jsonify({'error': error}), 400
return jsonify(order.to_dict()), 201
Sometimes the ORM gets in the way. Complex reporting queries, database-specific features (window functions, CTEs, recursive queries), or bulk operations are often cleaner and faster as raw SQL. SQLAlchemy makes this straightforward.
from sqlalchemy import text
# Simple query
result = db.session.execute(text('SELECT * FROM users WHERE is_active = :active'), {'active': True})
users = result.fetchall()
for user in users:
print(user.username, user.email) # Access by column name
# Insert
db.session.execute(
text('INSERT INTO users (username, email, password_hash) VALUES (:username, :email, :password)'),
{'username': 'dave', 'email': 'dave@example.com', 'password': 'hashed_pw'}
)
db.session.commit()
# Update
db.session.execute(
text('UPDATE products SET price = price * :multiplier WHERE category = :category'),
{'multiplier': 1.10, 'category': 'Electronics'}
)
db.session.commit()
# Delete
db.session.execute(
text('DELETE FROM sessions WHERE last_active < :cutoff'),
{'cutoff': datetime(2026, 1, 1)}
)
db.session.commit()
def get_monthly_revenue_report(year):
"""Get monthly revenue breakdown with running totals."""
sql = text("""
SELECT
EXTRACT(MONTH FROM o.created_at) AS month,
COUNT(DISTINCT o.id) AS order_count,
COUNT(DISTINCT o.user_id) AS unique_customers,
SUM(o.total_amount) AS monthly_revenue,
SUM(SUM(o.total_amount)) OVER (ORDER BY EXTRACT(MONTH FROM o.created_at)) AS running_total
FROM orders o
WHERE EXTRACT(YEAR FROM o.created_at) = :year
AND o.status = 'completed'
GROUP BY EXTRACT(MONTH FROM o.created_at)
ORDER BY month
""")
result = db.session.execute(sql, {'year': year})
return [
{
'month': int(row.month),
'order_count': row.order_count,
'unique_customers': row.unique_customers,
'revenue': float(row.monthly_revenue),
'running_total': float(row.running_total)
}
for row in result
]
# Bulk insert — much faster than ORM for large datasets
def bulk_import_products(products_data):
"""Import thousands of products efficiently."""
sql = text("""
INSERT INTO products (name, price, category, stock, is_available)
VALUES (:name, :price, :category, :stock, :is_available)
""")
# Execute with a list of parameter dicts
db.session.execute(sql, products_data)
db.session.commit()
# Usage
products = [
{'name': f'Product {i}', 'price': 9.99, 'category': 'Bulk', 'stock': 100, 'is_available': True}
for i in range(10000)
]
bulk_import_products(products)
Security note: Always use parameterized queries with :param_name placeholders. Never use f-strings or string concatenation to build SQL — that is how SQL injection happens.
Database performance problems are the most common cause of slow web applications. Here are the patterns and techniques that matter most in Flask-SQLAlchemy.
The N+1 problem occurs when you load a list of N objects and then access a relationship on each, causing N additional queries.
# BAD: N+1 queries — 1 query for orders + N queries for user on each order
orders = Order.query.all()
for order in orders:
print(order.user.username) # Each access triggers a separate SELECT
# GOOD: Eager loading with joinedload — 1 query total
from sqlalchemy.orm import joinedload
orders = Order.query.options(joinedload(Order.user)).all()
for order in orders:
print(order.user.username) # Already loaded, no extra query
# GOOD: Eager loading with selectinload — 2 queries total (better for one-to-many)
from sqlalchemy.orm import selectinload
users = User.query.options(selectinload(User.orders)).all()
for user in users:
print(f'{user.username}: {len(user.orders)} orders') # Already loaded
# Nested eager loading
orders = Order.query.options(
joinedload(Order.user),
selectinload(Order.items).joinedload(OrderItem.product)
).all()
# Load only specific columns usernames = db.session.query(User.username, User.email).filter_by(is_active=True).all() # Defer heavy columns (load them on access) from sqlalchemy.orm import defer users = User.query.options(defer(User.bio), defer(User.password_hash)).all() # Undefer when you need them users = User.query.options(defer(User.bio)).all() # Later, accessing user.bio will trigger a lazy load for that specific column
# Index columns you filter, sort, or join on frequently
class Product(db.Model):
__tablename__ = 'products'
id = db.Column(db.Integer, primary_key=True)
name = db.Column(db.String(200), index=True) # Searched frequently
category = db.Column(db.String(50), index=True) # Filtered frequently
price = db.Column(db.Numeric(10, 2))
created_at = db.Column(db.DateTime, index=True) # Sorted frequently
# Composite index for queries that filter on both
__table_args__ = (
db.Index('idx_category_price', 'category', 'price'),
db.Index('idx_category_created', 'category', 'created_at'),
)
# Enable SQL logging in development
app.config['SQLALCHEMY_ECHO'] = True
# Or use events for more control
from sqlalchemy import event
import time
@event.listens_for(db.engine, 'before_cursor_execute')
def before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
conn.info['query_start_time'] = time.time()
@event.listens_for(db.engine, 'after_cursor_execute')
def after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
total = time.time() - conn.info['query_start_time']
if total > 0.5: # Log slow queries (over 500ms)
app.logger.warning(f'Slow query ({total:.2f}s): {statement}')
These are the mistakes that cost real hours in debugging. Know them, avoid them.
# BUG: Changes are never persisted user = User(username='alice', email='alice@example.com', password_hash='hash') db.session.add(user) # Missing: db.session.commit() # The user exists in the session but NOT in the database # FIX: Always commit after making changes db.session.add(user) db.session.commit()
# BUG: Accessing attributes after the session is closed
def get_user_data():
user = User.query.first()
return user
# Later, outside the request context:
user = get_user_data()
print(user.orders) # DetachedInstanceError! Session is gone.
# FIX 1: Eager load what you need
def get_user_data():
return User.query.options(selectinload(User.orders)).first()
# FIX 2: Convert to dict while session is active
def get_user_data():
user = User.query.first()
return user.to_dict() # Serialize within the session context
# FIX 3: Keep the object attached by using it within the request
@app.route('/users/<int:id>')
def get_user(id):
user = User.query.get_or_404(id)
return jsonify(user.to_dict()) # Serialized within request context
# BUG: Template triggers N+1 queries
@app.route('/orders')
def list_orders():
orders = Order.query.all()
return render_template('orders.html', orders=orders)
# Template: {% for order in orders %} {{ order.user.username }} {% endfor %}
# This fires a SELECT for each order's user!
# FIX: Eager load in the view
@app.route('/orders')
def list_orders():
orders = Order.query.options(joinedload(Order.user)).all()
return render_template('orders.html', orders=orders)
# BUG: Failed operation poisons the session for subsequent requests
try:
db.session.add(user)
db.session.commit()
except IntegrityError:
pass # Session is now in a broken state!
# FIX: Always rollback on error
try:
db.session.add(user)
db.session.commit()
except IntegrityError:
db.session.rollback()
# Now the session is clean for the next operation
# BAD: This will NOT update existing tables # If you add a column to a model, create_all() ignores it db.create_all() # Only creates tables that don't exist # GOOD: Use migrations for all schema changes # flask db migrate -m "Add new column" # flask db upgrade
Set up Flask-Migrate from day one, even for small projects. db.create_all() is only acceptable for throwaway prototypes and test fixtures. Every schema change should be a migration file committed to version control.
from sqlalchemy import validates
class User(db.Model):
__tablename__ = 'users'
id = db.Column(db.Integer, primary_key=True)
username = db.Column(db.String(80), unique=True, nullable=False)
email = db.Column(db.String(120), unique=True, nullable=False)
age = db.Column(db.Integer)
@validates('email')
def validate_email(self, key, email):
if '@' not in email:
raise ValueError('Invalid email address')
return email.lower().strip()
@validates('username')
def validate_username(self, key, username):
if len(username) < 3:
raise ValueError('Username must be at least 3 characters')
if not username.isalnum():
raise ValueError('Username must be alphanumeric')
return username.lower().strip()
@validates('age')
def validate_age(self, key, age):
if age is not None and (age < 0 or age > 150):
raise ValueError('Age must be between 0 and 150')
return age
Encapsulate database access in repository classes to keep your route handlers clean and your data access testable.
class UserRepository:
"""Encapsulates all database operations for User."""
@staticmethod
def create(username, email, password_hash):
user = User(username=username, email=email, password_hash=password_hash)
db.session.add(user)
db.session.commit()
return user
@staticmethod
def get_by_id(user_id):
return db.session.get(User, user_id)
@staticmethod
def get_by_username(username):
return User.query.filter_by(username=username).first()
@staticmethod
def get_active_users(page=1, per_page=20):
return User.query.filter_by(is_active=True)\
.order_by(User.created_at.desc())\
.paginate(page=page, per_page=per_page, error_out=False)
@staticmethod
def update(user_id, **kwargs):
user = db.session.get(User, user_id)
if not user:
return None
for key, value in kwargs.items():
if hasattr(user, key):
setattr(user, key, value)
db.session.commit()
return user
@staticmethod
def deactivate(user_id):
user = db.session.get(User, user_id)
if user:
user.is_active = False
db.session.commit()
return user
# Usage in routes — clean and testable
@app.route('/api/users', methods=['POST'])
def create_user():
data = request.get_json()
try:
user = UserRepository.create(
username=data['username'],
email=data['email'],
password_hash=generate_password_hash(data['password'])
)
return jsonify(user.to_dict()), 201
except IntegrityError:
db.session.rollback()
return jsonify({'error': 'Username or email already exists'}), 409
# Ensure sessions are cleaned up after each request
@app.teardown_appcontext
def shutdown_session(exception=None):
db.session.remove()
# Use pool_pre_ping to handle stale connections
app.config['SQLALCHEMY_ENGINE_OPTIONS'] = {
'pool_pre_ping': True,
'pool_recycle': 1800,
}
import os
class Config:
SQLALCHEMY_TRACK_MODIFICATIONS = False
SQLALCHEMY_ENGINE_OPTIONS = {
'pool_pre_ping': True,
}
class DevelopmentConfig(Config):
SQLALCHEMY_DATABASE_URI = 'sqlite:///dev.db'
SQLALCHEMY_ECHO = True
class TestingConfig(Config):
SQLALCHEMY_DATABASE_URI = 'sqlite:///:memory:'
TESTING = True
class ProductionConfig(Config):
SQLALCHEMY_DATABASE_URI = os.environ.get('DATABASE_URL')
SQLALCHEMY_ENGINE_OPTIONS = {
'pool_size': 20,
'max_overflow': 30,
'pool_recycle': 1800,
'pool_pre_ping': True,
}
config = {
'development': DevelopmentConfig,
'testing': TestingConfig,
'production': ProductionConfig,
}
db.Model. Use column types, constraints, and indexes to enforce data integrity at the database level.db.relationship() with back_populates for bidirectional access. Choose the right lazy loading strategy to avoid N+1 queries.commit() calls. Always rollback() on error. Use begin_nested() for savepoints when you need partial rollback.pool_size, pool_recycle, and enable pool_pre_ping for production stability.joinedload, selectinload) is your primary weapon against N+1 performance problems. Profile your queries in development with SQLALCHEMY_ECHO = True.text() with parameterized queries — never string concatenation.@validates decorators and database constraints (CheckConstraint, unique, nullable).