Almost every real-world application needs to persist data, and relational databases remain the backbone of most production systems. MySQL, the world’s most popular open-source relational database, pairs naturally with Python — one of the world’s most popular programming languages. Whether you are building a web application with Flask or Django, automating data pipelines, or writing microservices, knowing how to talk to MySQL from Python is a non-negotiable skill.
In this tutorial you will learn everything from establishing a basic connection to managing transactions, pooling connections for production workloads, and even mapping your tables to Python objects with SQLAlchemy. Every example is production-minded: parameterized queries, proper error handling, and clean resource management from the start.
The most common MySQL driver for Python is mysql-connector-python, maintained by Oracle. Install it with pip:
pip install mysql-connector-python
A popular alternative is PyMySQL, a pure-Python driver that requires no C extensions:
pip install pymysql
Both libraries follow the Python DB-API 2.0 specification (PEP 249), so the core patterns — connect, cursor, execute, fetch — are nearly identical. This tutorial uses mysql-connector-python for all examples. If you are using PyMySQL, swap the import and connection call and the rest of your code stays the same.
You will also need a running MySQL server. If you do not have one, the quickest path is Docker:
# Pull and run MySQL 8 in a container docker run --name mysql-tutorial \ -e MYSQL_ROOT_PASSWORD=rootpass \ -p 3306:3306 \ -d mysql:8
Every interaction starts with a connection. You provide the host, port, user, password, and optionally a database name:
import mysql.connector
# Establish a connection
conn = mysql.connector.connect(
host="127.0.0.1",
port=3306,
user="root",
password="rootpass"
)
print("Connected:", conn.is_connected()) # True
# Always close when done
conn.close()
If the connection fails — wrong password, server not running, network issue — mysql.connector.Error is raised. Always wrap your connection logic in a try/except block:
import mysql.connector
from mysql.connector import Error
try:
conn = mysql.connector.connect(
host="127.0.0.1",
user="root",
password="rootpass"
)
if conn.is_connected():
info = conn.get_server_info()
print(f"Connected to MySQL Server version {info}")
except Error as e:
print(f"Error connecting to MySQL: {e}")
finally:
if 'conn' in locals() and conn.is_connected():
conn.close()
print("Connection closed")
Once connected, use a cursor to execute SQL statements. Let us create a database and a table:
import mysql.connector
from mysql.connector import Error
conn = mysql.connector.connect(
host="127.0.0.1",
user="root",
password="rootpass"
)
cursor = conn.cursor()
# Create database
cursor.execute("CREATE DATABASE IF NOT EXISTS tutorial_db")
cursor.execute("USE tutorial_db")
# Create table
create_table_sql = """
CREATE TABLE IF NOT EXISTS users (
id INT AUTO_INCREMENT PRIMARY KEY,
username VARCHAR(50) NOT NULL UNIQUE,
email VARCHAR(100) NOT NULL,
age INT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""
cursor.execute(create_table_sql)
print("Database and table created successfully")
cursor.close()
conn.close()
You can also connect directly to a database by passing the database parameter:
conn = mysql.connector.connect(
host="127.0.0.1",
user="root",
password="rootpass",
database="tutorial_db"
)
CRUD — Create, Read, Update, Delete — covers the four fundamental data operations. Let us walk through each one.
Single insert:
import mysql.connector
conn = mysql.connector.connect(
host="127.0.0.1",
user="root",
password="rootpass",
database="tutorial_db"
)
cursor = conn.cursor()
sql = "INSERT INTO users (username, email, age) VALUES (%s, %s, %s)"
values = ("alice", "alice@example.com", 30)
cursor.execute(sql, values)
conn.commit() # IMPORTANT: commit the transaction
print(f"Inserted user with ID: {cursor.lastrowid}")
cursor.close()
conn.close()
Batch insert with executemany():
sql = "INSERT INTO users (username, email, age) VALUES (%s, %s, %s)"
users = [
("bob", "bob@example.com", 25),
("charlie", "charlie@example.com", 35),
("diana", "diana@example.com", 28),
("eve", "eve@example.com", 32),
]
cursor.executemany(sql, users)
conn.commit()
print(f"Inserted {cursor.rowcount} rows")
executemany() is significantly faster than looping with individual execute() calls because the driver can optimize the network round-trips.
The cursor provides three fetch methods:
fetchone() — returns the next row as a tuple, or Nonefetchall() — returns all remaining rows as a list of tuplesfetchmany(size) — returns up to size rows# Fetch all users
cursor.execute("SELECT id, username, email, age FROM users")
rows = cursor.fetchall()
for row in rows:
print(f"ID: {row[0]}, Username: {row[1]}, Email: {row[2]}, Age: {row[3]}")
For more readable code, use a dictionary cursor so each row is a dict instead of a tuple:
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM users WHERE age > %s", (28,))
for user in cursor.fetchall():
print(f"{user['username']} ({user['email']}) - Age {user['age']}")
Fetching one row at a time is memory-efficient for large result sets:
cursor.execute("SELECT * FROM users ORDER BY created_at DESC")
row = cursor.fetchone()
while row:
print(row)
row = cursor.fetchone()
Fetching in batches balances memory and performance:
cursor.execute("SELECT * FROM users")
while True:
batch = cursor.fetchmany(size=2)
if not batch:
break
for row in batch:
print(row)
sql = "UPDATE users SET email = %s, age = %s WHERE username = %s"
values = ("alice_new@example.com", 31, "alice")
cursor.execute(sql, values)
conn.commit()
print(f"Rows affected: {cursor.rowcount}")
sql = "DELETE FROM users WHERE username = %s"
cursor.execute(sql, ("eve",))
conn.commit()
print(f"Deleted {cursor.rowcount} row(s)")
Always check cursor.rowcount after UPDATE and DELETE to confirm the operation affected the expected number of rows.
This is not optional — it is a hard requirement for any production code. Parameterized queries prevent SQL injection, one of the most dangerous and most common web vulnerabilities.
Never do this:
# DANGEROUS — SQL injection vulnerability!
username = input("Enter username: ")
cursor.execute(f"SELECT * FROM users WHERE username = '{username}'")
If a user enters ' OR '1'='1, that query returns every row in the table. Worse, they could enter '; DROP TABLE users; -- and destroy your data.
Always do this:
# SAFE — parameterized query
username = input("Enter username: ")
cursor.execute("SELECT * FROM users WHERE username = %s", (username,))
user = cursor.fetchone()
The %s placeholder tells the driver to properly escape and quote the value. This works regardless of what the user types — the database sees it as a literal value, not executable SQL.
Key rules:
%s as the placeholder (not ? — that is for SQLite)(value,)f"", .format(), %) to build SQLA transaction groups multiple SQL statements into a single atomic unit. Either all of them succeed, or none of them do. MySQL with InnoDB supports full ACID transactions.
import mysql.connector
from mysql.connector import Error
conn = mysql.connector.connect(
host="127.0.0.1",
user="root",
password="rootpass",
database="tutorial_db"
)
try:
cursor = conn.cursor()
# Transfer "credits" from alice to bob (both must succeed)
cursor.execute(
"UPDATE users SET age = age - 1 WHERE username = %s", ("alice",)
)
cursor.execute(
"UPDATE users SET age = age + 1 WHERE username = %s", ("bob",)
)
conn.commit() # Both updates are saved
print("Transaction committed")
except Error as e:
conn.rollback() # Undo everything if any statement fails
print(f"Transaction rolled back: {e}")
finally:
cursor.close()
conn.close()
By default, mysql-connector-python does not auto-commit. You must call conn.commit() explicitly. If you want auto-commit behavior (not recommended for multi-statement operations), set it at connection time:
# Auto-commit mode — each statement is its own transaction
conn = mysql.connector.connect(
host="127.0.0.1",
user="root",
password="rootpass",
database="tutorial_db",
autocommit=True
)
When to use explicit transactions:
Opening and closing database connections is expensive. In a web application handling hundreds of requests per second, creating a new connection for every request wastes time and resources. Connection pooling solves this by maintaining a pool of reusable connections.
from mysql.connector import pooling
# Create a connection pool
pool = pooling.MySQLConnectionPool(
pool_name="tutorial_pool",
pool_size=5,
pool_reset_session=True,
host="127.0.0.1",
user="root",
password="rootpass",
database="tutorial_db"
)
# Get a connection from the pool
conn = pool.get_connection()
cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM users")
for user in cursor.fetchall():
print(user)
cursor.close()
conn.close() # Returns the connection to the pool, does not destroy it
When you call conn.close() on a pooled connection, it goes back to the pool instead of being destroyed. The next call to pool.get_connection() can reuse it immediately.
Pool sizing guidelines:
pool_size=5 and increase based on load testingSHOW STATUS LIKE 'Threads_connected' in MySQLHere is a thread-safe pattern for a web application:
from mysql.connector import pooling, Error
class Database:
"""Thread-safe database access using connection pooling."""
def __init__(self, **kwargs):
self.pool = pooling.MySQLConnectionPool(
pool_name="app_pool",
pool_size=10,
**kwargs
)
def execute_query(self, query, params=None, fetch=False):
conn = self.pool.get_connection()
try:
cursor = conn.cursor(dictionary=True)
cursor.execute(query, params)
if fetch:
result = cursor.fetchall()
else:
conn.commit()
result = cursor.rowcount
return result
except Error as e:
conn.rollback()
raise e
finally:
cursor.close()
conn.close()
# Usage
db = Database(
host="127.0.0.1",
user="root",
password="rootpass",
database="tutorial_db"
)
users = db.execute_query("SELECT * FROM users WHERE age > %s", (25,), fetch=True)
print(users)
Context managers (the with statement) guarantee that resources are cleaned up even if an exception occurs. Let us build a reusable context manager for database operations:
from contextlib import contextmanager
import mysql.connector
from mysql.connector import Error
@contextmanager
def get_db_connection(config):
"""Context manager that provides a database connection."""
conn = mysql.connector.connect(**config)
try:
yield conn
except Error as e:
conn.rollback()
raise e
finally:
conn.close()
@contextmanager
def get_db_cursor(conn, dictionary=True):
"""Context manager that provides a cursor and commits on success."""
cursor = conn.cursor(dictionary=dictionary)
try:
yield cursor
conn.commit()
except Error as e:
conn.rollback()
raise e
finally:
cursor.close()
# Configuration
DB_CONFIG = {
"host": "127.0.0.1",
"user": "root",
"password": "rootpass",
"database": "tutorial_db"
}
# Usage — clean and exception-safe
with get_db_connection(DB_CONFIG) as conn:
with get_db_cursor(conn) as cursor:
cursor.execute(
"INSERT INTO users (username, email, age) VALUES (%s, %s, %s)",
("frank", "frank@example.com", 29)
)
print(f"Inserted row ID: {cursor.lastrowid}")
# Connection and cursor are automatically closed here
This pattern is the recommended way to manage database resources in production Python applications. It eliminates an entire class of bugs — leaked connections, uncommitted transactions, and unclosed cursors.
For pooled connections, combine the two patterns:
from mysql.connector import pooling
from contextlib import contextmanager
pool = pooling.MySQLConnectionPool(
pool_name="app_pool",
pool_size=5,
host="127.0.0.1",
user="root",
password="rootpass",
database="tutorial_db"
)
@contextmanager
def get_connection():
conn = pool.get_connection()
try:
yield conn
finally:
conn.close() # Returns to pool
@contextmanager
def get_cursor(conn):
cursor = conn.cursor(dictionary=True)
try:
yield cursor
conn.commit()
except Exception:
conn.rollback()
raise
finally:
cursor.close()
# Usage
with get_connection() as conn:
with get_cursor(conn) as cursor:
cursor.execute("SELECT COUNT(*) AS total FROM users")
result = cursor.fetchone()
print(f"Total users: {result['total']}")
So far, every example has used raw SQL. That works well for simple applications and gives you full control. But as your application grows — more tables, more relationships, more complex queries — writing raw SQL becomes tedious and error-prone. That is where an ORM (Object-Relational Mapper) shines.
SQLAlchemy is Python’s most powerful and most widely used ORM. Install it alongside the MySQL driver:
pip install sqlalchemy mysql-connector-python
SQLAlchemy needs three things to get started:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, DeclarativeBase
# Connection URL format: mysql+connector://user:password@host:port/database
engine = create_engine(
"mysql+mysqlconnector://root:rootpass@127.0.0.1:3306/tutorial_db",
echo=False, # Set True to log all SQL statements
pool_size=5,
max_overflow=10
)
# Create a session factory
SessionLocal = sessionmaker(bind=engine)
# Base class for models
class Base(DeclarativeBase):
pass
Each model class maps to a database table. Columns become class attributes:
from sqlalchemy import Column, Integer, String, DateTime, ForeignKey
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func
class User(Base):
__tablename__ = "orm_users"
id = Column(Integer, primary_key=True, autoincrement=True)
username = Column(String(50), unique=True, nullable=False)
email = Column(String(100), nullable=False)
age = Column(Integer)
created_at = Column(DateTime, server_default=func.now())
# One-to-many relationship
posts = relationship("Post", back_populates="author",
cascade="all, delete-orphan")
def __repr__(self):
return f"<User(id={self.id}, username='{self.username}')>"
class Post(Base):
__tablename__ = "orm_posts"
id = Column(Integer, primary_key=True, autoincrement=True)
title = Column(String(200), nullable=False)
body = Column(String(5000))
user_id = Column(Integer, ForeignKey("orm_users.id"), nullable=False)
created_at = Column(DateTime, server_default=func.now())
author = relationship("User", back_populates="posts")
def __repr__(self):
return f"<Post(id={self.id}, title='{self.title}')>"
# Create all tables
Base.metadata.create_all(engine)
# CREATE
session = SessionLocal()
new_user = User(username="grace", email="grace@example.com", age=27)
session.add(new_user)
session.commit()
print(f"Created: {new_user}")
# Add a post for this user
new_post = Post(title="My First Post", body="Hello from SQLAlchemy!",
user_id=new_user.id)
session.add(new_post)
session.commit()
# READ
user = session.query(User).filter_by(username="grace").first()
print(f"Found: {user}")
print(f"Posts: {user.posts}") # Lazy-loaded relationship
# All users older than 25
young_users = session.query(User).filter(User.age > 25).all()
for u in young_users:
print(u)
# UPDATE
user.email = "grace_updated@example.com"
session.commit()
# DELETE
session.delete(user) # Also deletes posts due to cascade
session.commit()
session.close()
With the ORM, notice how you never write a single line of SQL. SQLAlchemy generates it for you, handles parameterization, and manages the transaction lifecycle.
from contextlib import contextmanager
@contextmanager
def get_session():
session = SessionLocal()
try:
yield session
session.commit()
except Exception:
session.rollback()
raise
finally:
session.close()
# Usage
with get_session() as session:
user = User(username="henry", email="henry@example.com", age=33)
session.add(user)
# Automatically committed when the block exits without error
| Use ORM When | Use Raw SQL When |
|---|---|
| Building a CRUD-heavy application | Running complex analytical queries |
| You need relationship management | You need maximum query performance |
| Rapid prototyping and iteration | Migrating or bulk-loading data |
| Working with multiple database backends | Using database-specific features |
| Team members vary in SQL skill | Debugging performance issues |
Many production applications use both — ORM for standard CRUD and raw SQL (via session.execute()) for complex queries and reporting.
A complete user management module with registration, authentication, and profile updates:
import mysql.connector
from mysql.connector import pooling, Error
import hashlib
import os
from contextlib import contextmanager
# --- Database Setup ---
pool = pooling.MySQLConnectionPool(
pool_name="user_mgmt_pool",
pool_size=5,
host="127.0.0.1",
user="root",
password="rootpass",
database="tutorial_db"
)
@contextmanager
def get_connection():
conn = pool.get_connection()
try:
yield conn
finally:
conn.close()
@contextmanager
def get_cursor(conn, dictionary=True):
cursor = conn.cursor(dictionary=dictionary)
try:
yield cursor
conn.commit()
except Error:
conn.rollback()
raise
finally:
cursor.close()
def init_db():
"""Create the accounts table if it does not exist."""
with get_connection() as conn:
with get_cursor(conn) as cursor:
cursor.execute("""
CREATE TABLE IF NOT EXISTS accounts (
id INT AUTO_INCREMENT PRIMARY KEY,
username VARCHAR(50) NOT NULL UNIQUE,
email VARCHAR(100) NOT NULL UNIQUE,
password_hash VARCHAR(128) NOT NULL,
salt VARCHAR(64) NOT NULL,
full_name VARCHAR(100),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
ON UPDATE CURRENT_TIMESTAMP
)
""")
def hash_password(password, salt=None):
"""Hash a password with a random salt."""
if salt is None:
salt = os.urandom(32).hex()
hashed = hashlib.sha256((salt + password).encode()).hexdigest()
return hashed, salt
def register_user(username, email, password, full_name=None):
"""Register a new user. Returns user ID on success."""
password_hash, salt = hash_password(password)
with get_connection() as conn:
with get_cursor(conn) as cursor:
try:
cursor.execute(
"""INSERT INTO accounts
(username, email, password_hash, salt, full_name)
VALUES (%s, %s, %s, %s, %s)""",
(username, email, password_hash, salt, full_name)
)
print(f"User '{username}' registered with ID {cursor.lastrowid}")
return cursor.lastrowid
except Error as e:
if e.errno == 1062: # Duplicate entry
print("Registration failed: username or email already exists")
return None
raise
def login(username, password):
"""Authenticate a user. Returns user dict or None."""
with get_connection() as conn:
with get_cursor(conn) as cursor:
cursor.execute(
"""SELECT id, username, email, password_hash, salt, full_name
FROM accounts WHERE username = %s""",
(username,)
)
user = cursor.fetchone()
if user is None:
print("Login failed: user not found")
return None
hashed, _ = hash_password(password, user["salt"])
if hashed != user["password_hash"]:
print("Login failed: incorrect password")
return None
print(f"Welcome back, {user['full_name'] or user['username']}!")
return {
"id": user["id"],
"username": user["username"],
"email": user["email"],
"full_name": user["full_name"]
}
def update_profile(user_id, **kwargs):
"""Update user profile fields. Only updates provided fields."""
allowed_fields = {"email", "full_name"}
updates = {k: v for k, v in kwargs.items() if k in allowed_fields}
if not updates:
print("No valid fields to update")
return False
set_clause = ", ".join(f"{field} = %s" for field in updates)
values = list(updates.values()) + [user_id]
with get_connection() as conn:
with get_cursor(conn) as cursor:
cursor.execute(
f"UPDATE accounts SET {set_clause} WHERE id = %s",
tuple(values)
)
if cursor.rowcount > 0:
print(f"Profile updated for user ID {user_id}")
return True
print("User not found")
return False
# --- Demo ---
if __name__ == "__main__":
init_db()
# Register
user_id = register_user(
"johndoe", "john@example.com", "s3cur3P@ss", "John Doe"
)
# Login
user = login("johndoe", "s3cur3P@ss")
# Update profile
if user:
update_profile(
user["id"],
email="john.doe@newmail.com",
full_name="John A. Doe"
)
import mysql.connector
from mysql.connector import pooling, Error
from contextlib import contextmanager
from decimal import Decimal
pool = pooling.MySQLConnectionPool(
pool_name="inventory_pool",
pool_size=5,
host="127.0.0.1",
user="root",
password="rootpass",
database="tutorial_db"
)
@contextmanager
def db_cursor(dictionary=True):
conn = pool.get_connection()
cursor = conn.cursor(dictionary=dictionary)
try:
yield cursor
conn.commit()
except Error:
conn.rollback()
raise
finally:
cursor.close()
conn.close()
def init_inventory():
with db_cursor() as cursor:
cursor.execute("""
CREATE TABLE IF NOT EXISTS products (
id INT AUTO_INCREMENT PRIMARY KEY,
name VARCHAR(100) NOT NULL,
sku VARCHAR(50) NOT NULL UNIQUE,
price DECIMAL(10, 2) NOT NULL,
quantity INT NOT NULL DEFAULT 0,
category VARCHAR(50),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
""")
def add_product(name, sku, price, quantity=0, category=None):
with db_cursor() as cursor:
cursor.execute(
"""INSERT INTO products (name, sku, price, quantity, category)
VALUES (%s, %s, %s, %s, %s)""",
(name, sku, price, quantity, category)
)
return cursor.lastrowid
def restock(sku, amount):
"""Add stock to an existing product."""
with db_cursor() as cursor:
cursor.execute(
"UPDATE products SET quantity = quantity + %s WHERE sku = %s",
(amount, sku)
)
if cursor.rowcount == 0:
raise ValueError(f"Product with SKU '{sku}' not found")
print(f"Restocked {amount} units of {sku}")
def sell(sku, amount):
"""Reduce stock. Raises error if insufficient stock."""
with db_cursor() as cursor:
# Check current stock
cursor.execute(
"SELECT quantity FROM products WHERE sku = %s", (sku,)
)
product = cursor.fetchone()
if product is None:
raise ValueError(f"Product '{sku}' not found")
if product["quantity"] < amount:
raise ValueError(
f"Insufficient stock: {product['quantity']} available, "
f"{amount} requested"
)
cursor.execute(
"UPDATE products SET quantity = quantity - %s WHERE sku = %s",
(amount, sku)
)
print(f"Sold {amount} units of {sku}")
def get_low_stock(threshold=10):
"""Find products that need restocking."""
with db_cursor() as cursor:
cursor.execute(
"""SELECT name, sku, quantity FROM products
WHERE quantity <= %s ORDER BY quantity ASC""",
(threshold,)
)
return cursor.fetchall()
def get_inventory_value():
"""Calculate total inventory value."""
with db_cursor() as cursor:
cursor.execute(
"SELECT SUM(price * quantity) AS total_value FROM products"
)
result = cursor.fetchone()
return result["total_value"] or Decimal("0.00")
def search_products(keyword):
"""Search products by name or category."""
with db_cursor() as cursor:
pattern = f"%{keyword}%"
cursor.execute(
"""SELECT * FROM products
WHERE name LIKE %s OR category LIKE %s""",
(pattern, pattern)
)
return cursor.fetchall()
# --- Demo ---
if __name__ == "__main__":
init_inventory()
# Add products
add_product("Mechanical Keyboard", "KB-001", 89.99, 50, "Electronics")
add_product("USB-C Cable", "CB-001", 12.99, 200, "Accessories")
add_product("Monitor Stand", "MS-001", 45.00, 15, "Furniture")
add_product("Webcam HD", "WC-001", 59.99, 8, "Electronics")
# Sell some items
sell("KB-001", 5)
restock("WC-001", 20)
# Reports
print("\nLow stock items:")
for item in get_low_stock(threshold=20):
print(f" {item['name']} (SKU: {item['sku']}): {item['quantity']} left")
print(f"\nTotal inventory value: ${get_inventory_value():,.2f}")
print("\nElectronics products:")
for p in search_products("Electronics"):
print(f" {p['name']} - ${p['price']} ({p['quantity']} in stock)")
A reusable data access layer that any application can build on — similar to a repository pattern used in web frameworks:
import mysql.connector
from mysql.connector import pooling, Error
from contextlib import contextmanager
class DataAccessLayer:
"""A generic, reusable data access layer for MySQL."""
def __init__(self, host, user, password, database, pool_size=5):
self.pool = pooling.MySQLConnectionPool(
pool_name="dal_pool",
pool_size=pool_size,
host=host,
user=user,
password=password,
database=database
)
@contextmanager
def _get_cursor(self):
conn = self.pool.get_connection()
cursor = conn.cursor(dictionary=True)
try:
yield cursor, conn
finally:
cursor.close()
conn.close()
def fetch_all(self, query, params=None):
"""Execute a SELECT and return all rows."""
with self._get_cursor() as (cursor, conn):
cursor.execute(query, params)
return cursor.fetchall()
def fetch_one(self, query, params=None):
"""Execute a SELECT and return the first row."""
with self._get_cursor() as (cursor, conn):
cursor.execute(query, params)
return cursor.fetchone()
def execute(self, query, params=None):
"""Execute INSERT, UPDATE, or DELETE. Returns affected row count."""
with self._get_cursor() as (cursor, conn):
cursor.execute(query, params)
conn.commit()
return cursor.rowcount
def insert(self, query, params=None):
"""Execute an INSERT and return the new row's ID."""
with self._get_cursor() as (cursor, conn):
cursor.execute(query, params)
conn.commit()
return cursor.lastrowid
def execute_many(self, query, params_list):
"""Execute a batch operation. Returns affected row count."""
with self._get_cursor() as (cursor, conn):
cursor.executemany(query, params_list)
conn.commit()
return cursor.rowcount
def execute_transaction(self, operations):
"""
Execute multiple operations in a single transaction.
operations: list of (query, params) tuples
"""
with self._get_cursor() as (cursor, conn):
try:
for query, params in operations:
cursor.execute(query, params)
conn.commit()
return True
except Error:
conn.rollback()
raise
# --- Usage Example ---
dal = DataAccessLayer(
host="127.0.0.1",
user="root",
password="rootpass",
database="tutorial_db"
)
# Insert
user_id = dal.insert(
"INSERT INTO users (username, email, age) VALUES (%s, %s, %s)",
("ivy", "ivy@example.com", 26)
)
# Read
users = dal.fetch_all("SELECT * FROM users WHERE age > %s", (25,))
for user in users:
print(user)
# Update
affected = dal.execute(
"UPDATE users SET age = %s WHERE username = %s",
(27, "ivy")
)
# Transaction
dal.execute_transaction([
("UPDATE users SET age = age - 1 WHERE username = %s", ("alice",)),
("UPDATE users SET age = age + 1 WHERE username = %s", ("bob",)),
])
These are the mistakes that burn developers most often. Learn them here so you do not learn them in a production outage.
We covered this above, but it bears repeating. Never build SQL strings with user input. Always use parameterized queries. This is the number-one security vulnerability in web applications, and it is completely preventable.
If your INSERTs and UPDATEs seem to work but the data disappears, you forgot to call conn.commit(). The default mode is manual commit — every write must be explicitly committed.
# This does NOTHING to the database without commit()
cursor.execute(
"INSERT INTO users (username, email) VALUES (%s, %s)",
("ghost", "ghost@example.com")
)
# conn.commit() <-- Missing! Data is lost when connection closes.
If you open connections without closing them, your application eventually exhausts the MySQL connection limit (default: 151). Use context managers or try/finally blocks to guarantee cleanup:
# BAD — if an exception occurs, connection is never closed
conn = mysql.connector.connect(**config)
cursor = conn.cursor()
cursor.execute("SELECT * FROM users")
# ... exception here means conn.close() never runs
conn.close()
# GOOD — finally block guarantees cleanup
conn = mysql.connector.connect(**config)
try:
cursor = conn.cursor()
cursor.execute("SELECT * FROM users")
results = cursor.fetchall()
finally:
conn.close()
This is especially common with ORMs. If you load a list of users, then loop through them loading each user's posts individually, you make 1 + N queries instead of a single JOIN:
# BAD — N+1 queries
users = session.query(User).all() # 1 query
for user in users:
print(user.posts) # N queries (1 per user)
# GOOD — eager loading with joinedload
from sqlalchemy.orm import joinedload
users = (
session.query(User)
.options(joinedload(User.posts))
.all()
) # 1 query
for user in users:
print(user.posts) # No additional queries
Database operations can fail for many reasons: deadlocks, timeouts, constraint violations, server restarts. Always wrap database calls in try/except and handle failures gracefully.
Never store raw passwords. Always hash them with a salt. Use bcrypt or argon2 in production — our example used SHA-256 for simplicity, but dedicated password hashing libraries are much more secure.
mysql.connector.Error, log the details, and fail gracefully. Do not let raw database errors leak to your users.os.environ or a secrets manager.%s placeholders) are mandatory — they prevent SQL injection and should be your default.commit() / rollback()) ensure data consistency for multi-statement operations.With these patterns and practices in your toolkit, you can confidently build Python applications backed by MySQL — from quick scripts to production web services.