Python Advanced – MySQL

Introduction

Almost every real-world application needs to persist data, and relational databases remain the backbone of most production systems. MySQL, the world’s most popular open-source relational database, pairs naturally with Python — one of the world’s most popular programming languages. Whether you are building a web application with Flask or Django, automating data pipelines, or writing microservices, knowing how to talk to MySQL from Python is a non-negotiable skill.

In this tutorial you will learn everything from establishing a basic connection to managing transactions, pooling connections for production workloads, and even mapping your tables to Python objects with SQLAlchemy. Every example is production-minded: parameterized queries, proper error handling, and clean resource management from the start.

Setup & Installation

The most common MySQL driver for Python is mysql-connector-python, maintained by Oracle. Install it with pip:

pip install mysql-connector-python

A popular alternative is PyMySQL, a pure-Python driver that requires no C extensions:

pip install pymysql

Both libraries follow the Python DB-API 2.0 specification (PEP 249), so the core patterns — connect, cursor, execute, fetch — are nearly identical. This tutorial uses mysql-connector-python for all examples. If you are using PyMySQL, swap the import and connection call and the rest of your code stays the same.

You will also need a running MySQL server. If you do not have one, the quickest path is Docker:

# Pull and run MySQL 8 in a container
docker run --name mysql-tutorial \
  -e MYSQL_ROOT_PASSWORD=rootpass \
  -p 3306:3306 \
  -d mysql:8

Connecting to MySQL

Every interaction starts with a connection. You provide the host, port, user, password, and optionally a database name:

import mysql.connector

# Establish a connection
conn = mysql.connector.connect(
    host="127.0.0.1",
    port=3306,
    user="root",
    password="rootpass"
)

print("Connected:", conn.is_connected())  # True

# Always close when done
conn.close()

If the connection fails — wrong password, server not running, network issue — mysql.connector.Error is raised. Always wrap your connection logic in a try/except block:

import mysql.connector
from mysql.connector import Error

try:
    conn = mysql.connector.connect(
        host="127.0.0.1",
        user="root",
        password="rootpass"
    )
    if conn.is_connected():
        info = conn.get_server_info()
        print(f"Connected to MySQL Server version {info}")
except Error as e:
    print(f"Error connecting to MySQL: {e}")
finally:
    if 'conn' in locals() and conn.is_connected():
        conn.close()
        print("Connection closed")

Creating a Database and Tables

Once connected, use a cursor to execute SQL statements. Let us create a database and a table:

import mysql.connector
from mysql.connector import Error

conn = mysql.connector.connect(
    host="127.0.0.1",
    user="root",
    password="rootpass"
)

cursor = conn.cursor()

# Create database
cursor.execute("CREATE DATABASE IF NOT EXISTS tutorial_db")
cursor.execute("USE tutorial_db")

# Create table
create_table_sql = """
CREATE TABLE IF NOT EXISTS users (
    id INT AUTO_INCREMENT PRIMARY KEY,
    username VARCHAR(50) NOT NULL UNIQUE,
    email VARCHAR(100) NOT NULL,
    age INT,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
"""
cursor.execute(create_table_sql)

print("Database and table created successfully")

cursor.close()
conn.close()

You can also connect directly to a database by passing the database parameter:

conn = mysql.connector.connect(
    host="127.0.0.1",
    user="root",
    password="rootpass",
    database="tutorial_db"
)

CRUD Operations

CRUD — Create, Read, Update, Delete — covers the four fundamental data operations. Let us walk through each one.

INSERT — Creating Records

Single insert:

import mysql.connector

conn = mysql.connector.connect(
    host="127.0.0.1",
    user="root",
    password="rootpass",
    database="tutorial_db"
)
cursor = conn.cursor()

sql = "INSERT INTO users (username, email, age) VALUES (%s, %s, %s)"
values = ("alice", "alice@example.com", 30)

cursor.execute(sql, values)
conn.commit()  # IMPORTANT: commit the transaction

print(f"Inserted user with ID: {cursor.lastrowid}")

cursor.close()
conn.close()

Batch insert with executemany():

sql = "INSERT INTO users (username, email, age) VALUES (%s, %s, %s)"
users = [
    ("bob", "bob@example.com", 25),
    ("charlie", "charlie@example.com", 35),
    ("diana", "diana@example.com", 28),
    ("eve", "eve@example.com", 32),
]

cursor.executemany(sql, users)
conn.commit()

print(f"Inserted {cursor.rowcount} rows")

executemany() is significantly faster than looping with individual execute() calls because the driver can optimize the network round-trips.

SELECT — Reading Records

The cursor provides three fetch methods:

  • fetchone() — returns the next row as a tuple, or None
  • fetchall() — returns all remaining rows as a list of tuples
  • fetchmany(size) — returns up to size rows
# Fetch all users
cursor.execute("SELECT id, username, email, age FROM users")
rows = cursor.fetchall()

for row in rows:
    print(f"ID: {row[0]}, Username: {row[1]}, Email: {row[2]}, Age: {row[3]}")

For more readable code, use a dictionary cursor so each row is a dict instead of a tuple:

cursor = conn.cursor(dictionary=True)
cursor.execute("SELECT * FROM users WHERE age > %s", (28,))

for user in cursor.fetchall():
    print(f"{user['username']} ({user['email']}) - Age {user['age']}")

Fetching one row at a time is memory-efficient for large result sets:

cursor.execute("SELECT * FROM users ORDER BY created_at DESC")

row = cursor.fetchone()
while row:
    print(row)
    row = cursor.fetchone()

Fetching in batches balances memory and performance:

cursor.execute("SELECT * FROM users")

while True:
    batch = cursor.fetchmany(size=2)
    if not batch:
        break
    for row in batch:
        print(row)

UPDATE — Modifying Records

sql = "UPDATE users SET email = %s, age = %s WHERE username = %s"
values = ("alice_new@example.com", 31, "alice")

cursor.execute(sql, values)
conn.commit()

print(f"Rows affected: {cursor.rowcount}")

DELETE — Removing Records

sql = "DELETE FROM users WHERE username = %s"
cursor.execute(sql, ("eve",))
conn.commit()

print(f"Deleted {cursor.rowcount} row(s)")

Always check cursor.rowcount after UPDATE and DELETE to confirm the operation affected the expected number of rows.

Parameterized Queries

This is not optional — it is a hard requirement for any production code. Parameterized queries prevent SQL injection, one of the most dangerous and most common web vulnerabilities.

Never do this:

# DANGEROUS — SQL injection vulnerability!
username = input("Enter username: ")
cursor.execute(f"SELECT * FROM users WHERE username = '{username}'")

If a user enters ' OR '1'='1, that query returns every row in the table. Worse, they could enter '; DROP TABLE users; -- and destroy your data.

Always do this:

# SAFE — parameterized query
username = input("Enter username: ")
cursor.execute("SELECT * FROM users WHERE username = %s", (username,))
user = cursor.fetchone()

The %s placeholder tells the driver to properly escape and quote the value. This works regardless of what the user types — the database sees it as a literal value, not executable SQL.

Key rules:

  • Always use %s as the placeholder (not ? — that is for SQLite)
  • Pass parameters as a tuple, even for a single value: (value,)
  • Never use Python string formatting (f"", .format(), %) to build SQL
  • Column and table names cannot be parameterized — validate them manually if they come from user input

Transaction Management

A transaction groups multiple SQL statements into a single atomic unit. Either all of them succeed, or none of them do. MySQL with InnoDB supports full ACID transactions.

import mysql.connector
from mysql.connector import Error

conn = mysql.connector.connect(
    host="127.0.0.1",
    user="root",
    password="rootpass",
    database="tutorial_db"
)

try:
    cursor = conn.cursor()

    # Transfer "credits" from alice to bob (both must succeed)
    cursor.execute(
        "UPDATE users SET age = age - 1 WHERE username = %s", ("alice",)
    )
    cursor.execute(
        "UPDATE users SET age = age + 1 WHERE username = %s", ("bob",)
    )

    conn.commit()  # Both updates are saved
    print("Transaction committed")

except Error as e:
    conn.rollback()  # Undo everything if any statement fails
    print(f"Transaction rolled back: {e}")

finally:
    cursor.close()
    conn.close()

By default, mysql-connector-python does not auto-commit. You must call conn.commit() explicitly. If you want auto-commit behavior (not recommended for multi-statement operations), set it at connection time:

# Auto-commit mode — each statement is its own transaction
conn = mysql.connector.connect(
    host="127.0.0.1",
    user="root",
    password="rootpass",
    database="tutorial_db",
    autocommit=True
)

When to use explicit transactions:

  • Multi-step operations that must succeed or fail together (transfers, order processing)
  • Batch inserts where partial completion is unacceptable
  • Any operation where data consistency matters (which is almost always)

Connection Pooling

Opening and closing database connections is expensive. In a web application handling hundreds of requests per second, creating a new connection for every request wastes time and resources. Connection pooling solves this by maintaining a pool of reusable connections.

from mysql.connector import pooling

# Create a connection pool
pool = pooling.MySQLConnectionPool(
    pool_name="tutorial_pool",
    pool_size=5,
    pool_reset_session=True,
    host="127.0.0.1",
    user="root",
    password="rootpass",
    database="tutorial_db"
)

# Get a connection from the pool
conn = pool.get_connection()
cursor = conn.cursor(dictionary=True)

cursor.execute("SELECT * FROM users")
for user in cursor.fetchall():
    print(user)

cursor.close()
conn.close()  # Returns the connection to the pool, does not destroy it

When you call conn.close() on a pooled connection, it goes back to the pool instead of being destroyed. The next call to pool.get_connection() can reuse it immediately.

Pool sizing guidelines:

  • Start with pool_size=5 and increase based on load testing
  • A good rule of thumb: pool size = (number of CPU cores * 2) + number of disk spindles
  • Too many connections waste server memory; too few cause request queuing
  • Monitor with SHOW STATUS LIKE 'Threads_connected' in MySQL

Here is a thread-safe pattern for a web application:

from mysql.connector import pooling, Error

class Database:
    """Thread-safe database access using connection pooling."""

    def __init__(self, **kwargs):
        self.pool = pooling.MySQLConnectionPool(
            pool_name="app_pool",
            pool_size=10,
            **kwargs
        )

    def execute_query(self, query, params=None, fetch=False):
        conn = self.pool.get_connection()
        try:
            cursor = conn.cursor(dictionary=True)
            cursor.execute(query, params)
            if fetch:
                result = cursor.fetchall()
            else:
                conn.commit()
                result = cursor.rowcount
            return result
        except Error as e:
            conn.rollback()
            raise e
        finally:
            cursor.close()
            conn.close()


# Usage
db = Database(
    host="127.0.0.1",
    user="root",
    password="rootpass",
    database="tutorial_db"
)

users = db.execute_query("SELECT * FROM users WHERE age > %s", (25,), fetch=True)
print(users)

Using Context Managers

Context managers (the with statement) guarantee that resources are cleaned up even if an exception occurs. Let us build a reusable context manager for database operations:

from contextlib import contextmanager
import mysql.connector
from mysql.connector import Error

@contextmanager
def get_db_connection(config):
    """Context manager that provides a database connection."""
    conn = mysql.connector.connect(**config)
    try:
        yield conn
    except Error as e:
        conn.rollback()
        raise e
    finally:
        conn.close()


@contextmanager
def get_db_cursor(conn, dictionary=True):
    """Context manager that provides a cursor and commits on success."""
    cursor = conn.cursor(dictionary=dictionary)
    try:
        yield cursor
        conn.commit()
    except Error as e:
        conn.rollback()
        raise e
    finally:
        cursor.close()


# Configuration
DB_CONFIG = {
    "host": "127.0.0.1",
    "user": "root",
    "password": "rootpass",
    "database": "tutorial_db"
}

# Usage — clean and exception-safe
with get_db_connection(DB_CONFIG) as conn:
    with get_db_cursor(conn) as cursor:
        cursor.execute(
            "INSERT INTO users (username, email, age) VALUES (%s, %s, %s)",
            ("frank", "frank@example.com", 29)
        )
        print(f"Inserted row ID: {cursor.lastrowid}")

# Connection and cursor are automatically closed here

This pattern is the recommended way to manage database resources in production Python applications. It eliminates an entire class of bugs — leaked connections, uncommitted transactions, and unclosed cursors.

For pooled connections, combine the two patterns:

from mysql.connector import pooling
from contextlib import contextmanager

pool = pooling.MySQLConnectionPool(
    pool_name="app_pool",
    pool_size=5,
    host="127.0.0.1",
    user="root",
    password="rootpass",
    database="tutorial_db"
)

@contextmanager
def get_connection():
    conn = pool.get_connection()
    try:
        yield conn
    finally:
        conn.close()  # Returns to pool

@contextmanager
def get_cursor(conn):
    cursor = conn.cursor(dictionary=True)
    try:
        yield cursor
        conn.commit()
    except Exception:
        conn.rollback()
        raise
    finally:
        cursor.close()

# Usage
with get_connection() as conn:
    with get_cursor(conn) as cursor:
        cursor.execute("SELECT COUNT(*) AS total FROM users")
        result = cursor.fetchone()
        print(f"Total users: {result['total']}")

Working with SQLAlchemy ORM

So far, every example has used raw SQL. That works well for simple applications and gives you full control. But as your application grows — more tables, more relationships, more complex queries — writing raw SQL becomes tedious and error-prone. That is where an ORM (Object-Relational Mapper) shines.

SQLAlchemy is Python’s most powerful and most widely used ORM. Install it alongside the MySQL driver:

pip install sqlalchemy mysql-connector-python

Engine, Session, and Base

SQLAlchemy needs three things to get started:

  • Engine — manages the connection pool and dialect (MySQL, PostgreSQL, etc.)
  • Session — the workspace where you load, create, and modify objects
  • Base — the parent class for all your ORM models
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, DeclarativeBase

# Connection URL format: mysql+connector://user:password@host:port/database
engine = create_engine(
    "mysql+mysqlconnector://root:rootpass@127.0.0.1:3306/tutorial_db",
    echo=False,       # Set True to log all SQL statements
    pool_size=5,
    max_overflow=10
)

# Create a session factory
SessionLocal = sessionmaker(bind=engine)

# Base class for models
class Base(DeclarativeBase):
    pass

Defining Models

Each model class maps to a database table. Columns become class attributes:

from sqlalchemy import Column, Integer, String, DateTime, ForeignKey
from sqlalchemy.orm import relationship
from sqlalchemy.sql import func

class User(Base):
    __tablename__ = "orm_users"

    id = Column(Integer, primary_key=True, autoincrement=True)
    username = Column(String(50), unique=True, nullable=False)
    email = Column(String(100), nullable=False)
    age = Column(Integer)
    created_at = Column(DateTime, server_default=func.now())

    # One-to-many relationship
    posts = relationship("Post", back_populates="author",
                         cascade="all, delete-orphan")

    def __repr__(self):
        return f"<User(id={self.id}, username='{self.username}')>"


class Post(Base):
    __tablename__ = "orm_posts"

    id = Column(Integer, primary_key=True, autoincrement=True)
    title = Column(String(200), nullable=False)
    body = Column(String(5000))
    user_id = Column(Integer, ForeignKey("orm_users.id"), nullable=False)
    created_at = Column(DateTime, server_default=func.now())

    author = relationship("User", back_populates="posts")

    def __repr__(self):
        return f"<Post(id={self.id}, title='{self.title}')>"


# Create all tables
Base.metadata.create_all(engine)

CRUD with the ORM

# CREATE
session = SessionLocal()

new_user = User(username="grace", email="grace@example.com", age=27)
session.add(new_user)
session.commit()
print(f"Created: {new_user}")

# Add a post for this user
new_post = Post(title="My First Post", body="Hello from SQLAlchemy!",
                user_id=new_user.id)
session.add(new_post)
session.commit()

# READ
user = session.query(User).filter_by(username="grace").first()
print(f"Found: {user}")
print(f"Posts: {user.posts}")  # Lazy-loaded relationship

# All users older than 25
young_users = session.query(User).filter(User.age > 25).all()
for u in young_users:
    print(u)

# UPDATE
user.email = "grace_updated@example.com"
session.commit()

# DELETE
session.delete(user)  # Also deletes posts due to cascade
session.commit()

session.close()

With the ORM, notice how you never write a single line of SQL. SQLAlchemy generates it for you, handles parameterization, and manages the transaction lifecycle.

Using Sessions as Context Managers

from contextlib import contextmanager

@contextmanager
def get_session():
    session = SessionLocal()
    try:
        yield session
        session.commit()
    except Exception:
        session.rollback()
        raise
    finally:
        session.close()

# Usage
with get_session() as session:
    user = User(username="henry", email="henry@example.com", age=33)
    session.add(user)
    # Automatically committed when the block exits without error

When to Use ORM vs Raw SQL

Use ORM When Use Raw SQL When
Building a CRUD-heavy application Running complex analytical queries
You need relationship management You need maximum query performance
Rapid prototyping and iteration Migrating or bulk-loading data
Working with multiple database backends Using database-specific features
Team members vary in SQL skill Debugging performance issues

Many production applications use both — ORM for standard CRUD and raw SQL (via session.execute()) for complex queries and reporting.

Practical Examples

Example 1: User Management System

A complete user management module with registration, authentication, and profile updates:

import mysql.connector
from mysql.connector import pooling, Error
import hashlib
import os
from contextlib import contextmanager

# --- Database Setup ---
pool = pooling.MySQLConnectionPool(
    pool_name="user_mgmt_pool",
    pool_size=5,
    host="127.0.0.1",
    user="root",
    password="rootpass",
    database="tutorial_db"
)

@contextmanager
def get_connection():
    conn = pool.get_connection()
    try:
        yield conn
    finally:
        conn.close()

@contextmanager
def get_cursor(conn, dictionary=True):
    cursor = conn.cursor(dictionary=dictionary)
    try:
        yield cursor
        conn.commit()
    except Error:
        conn.rollback()
        raise
    finally:
        cursor.close()


def init_db():
    """Create the accounts table if it does not exist."""
    with get_connection() as conn:
        with get_cursor(conn) as cursor:
            cursor.execute("""
                CREATE TABLE IF NOT EXISTS accounts (
                    id INT AUTO_INCREMENT PRIMARY KEY,
                    username VARCHAR(50) NOT NULL UNIQUE,
                    email VARCHAR(100) NOT NULL UNIQUE,
                    password_hash VARCHAR(128) NOT NULL,
                    salt VARCHAR(64) NOT NULL,
                    full_name VARCHAR(100),
                    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
                        ON UPDATE CURRENT_TIMESTAMP
                )
            """)


def hash_password(password, salt=None):
    """Hash a password with a random salt."""
    if salt is None:
        salt = os.urandom(32).hex()
    hashed = hashlib.sha256((salt + password).encode()).hexdigest()
    return hashed, salt


def register_user(username, email, password, full_name=None):
    """Register a new user. Returns user ID on success."""
    password_hash, salt = hash_password(password)

    with get_connection() as conn:
        with get_cursor(conn) as cursor:
            try:
                cursor.execute(
                    """INSERT INTO accounts
                       (username, email, password_hash, salt, full_name)
                       VALUES (%s, %s, %s, %s, %s)""",
                    (username, email, password_hash, salt, full_name)
                )
                print(f"User '{username}' registered with ID {cursor.lastrowid}")
                return cursor.lastrowid
            except Error as e:
                if e.errno == 1062:  # Duplicate entry
                    print("Registration failed: username or email already exists")
                    return None
                raise


def login(username, password):
    """Authenticate a user. Returns user dict or None."""
    with get_connection() as conn:
        with get_cursor(conn) as cursor:
            cursor.execute(
                """SELECT id, username, email, password_hash, salt, full_name
                   FROM accounts WHERE username = %s""",
                (username,)
            )
            user = cursor.fetchone()

            if user is None:
                print("Login failed: user not found")
                return None

            hashed, _ = hash_password(password, user["salt"])
            if hashed != user["password_hash"]:
                print("Login failed: incorrect password")
                return None

            print(f"Welcome back, {user['full_name'] or user['username']}!")
            return {
                "id": user["id"],
                "username": user["username"],
                "email": user["email"],
                "full_name": user["full_name"]
            }


def update_profile(user_id, **kwargs):
    """Update user profile fields. Only updates provided fields."""
    allowed_fields = {"email", "full_name"}
    updates = {k: v for k, v in kwargs.items() if k in allowed_fields}

    if not updates:
        print("No valid fields to update")
        return False

    set_clause = ", ".join(f"{field} = %s" for field in updates)
    values = list(updates.values()) + [user_id]

    with get_connection() as conn:
        with get_cursor(conn) as cursor:
            cursor.execute(
                f"UPDATE accounts SET {set_clause} WHERE id = %s",
                tuple(values)
            )
            if cursor.rowcount > 0:
                print(f"Profile updated for user ID {user_id}")
                return True
            print("User not found")
            return False


# --- Demo ---
if __name__ == "__main__":
    init_db()

    # Register
    user_id = register_user(
        "johndoe", "john@example.com", "s3cur3P@ss", "John Doe"
    )

    # Login
    user = login("johndoe", "s3cur3P@ss")

    # Update profile
    if user:
        update_profile(
            user["id"],
            email="john.doe@newmail.com",
            full_name="John A. Doe"
        )

Example 2: Product Inventory Tracker

import mysql.connector
from mysql.connector import pooling, Error
from contextlib import contextmanager
from decimal import Decimal

pool = pooling.MySQLConnectionPool(
    pool_name="inventory_pool",
    pool_size=5,
    host="127.0.0.1",
    user="root",
    password="rootpass",
    database="tutorial_db"
)

@contextmanager
def db_cursor(dictionary=True):
    conn = pool.get_connection()
    cursor = conn.cursor(dictionary=dictionary)
    try:
        yield cursor
        conn.commit()
    except Error:
        conn.rollback()
        raise
    finally:
        cursor.close()
        conn.close()


def init_inventory():
    with db_cursor() as cursor:
        cursor.execute("""
            CREATE TABLE IF NOT EXISTS products (
                id INT AUTO_INCREMENT PRIMARY KEY,
                name VARCHAR(100) NOT NULL,
                sku VARCHAR(50) NOT NULL UNIQUE,
                price DECIMAL(10, 2) NOT NULL,
                quantity INT NOT NULL DEFAULT 0,
                category VARCHAR(50),
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            )
        """)


def add_product(name, sku, price, quantity=0, category=None):
    with db_cursor() as cursor:
        cursor.execute(
            """INSERT INTO products (name, sku, price, quantity, category)
               VALUES (%s, %s, %s, %s, %s)""",
            (name, sku, price, quantity, category)
        )
        return cursor.lastrowid


def restock(sku, amount):
    """Add stock to an existing product."""
    with db_cursor() as cursor:
        cursor.execute(
            "UPDATE products SET quantity = quantity + %s WHERE sku = %s",
            (amount, sku)
        )
        if cursor.rowcount == 0:
            raise ValueError(f"Product with SKU '{sku}' not found")
        print(f"Restocked {amount} units of {sku}")


def sell(sku, amount):
    """Reduce stock. Raises error if insufficient stock."""
    with db_cursor() as cursor:
        # Check current stock
        cursor.execute(
            "SELECT quantity FROM products WHERE sku = %s", (sku,)
        )
        product = cursor.fetchone()

        if product is None:
            raise ValueError(f"Product '{sku}' not found")
        if product["quantity"] < amount:
            raise ValueError(
                f"Insufficient stock: {product['quantity']} available, "
                f"{amount} requested"
            )

        cursor.execute(
            "UPDATE products SET quantity = quantity - %s WHERE sku = %s",
            (amount, sku)
        )
        print(f"Sold {amount} units of {sku}")


def get_low_stock(threshold=10):
    """Find products that need restocking."""
    with db_cursor() as cursor:
        cursor.execute(
            """SELECT name, sku, quantity FROM products
               WHERE quantity <= %s ORDER BY quantity ASC""",
            (threshold,)
        )
        return cursor.fetchall()


def get_inventory_value():
    """Calculate total inventory value."""
    with db_cursor() as cursor:
        cursor.execute(
            "SELECT SUM(price * quantity) AS total_value FROM products"
        )
        result = cursor.fetchone()
        return result["total_value"] or Decimal("0.00")


def search_products(keyword):
    """Search products by name or category."""
    with db_cursor() as cursor:
        pattern = f"%{keyword}%"
        cursor.execute(
            """SELECT * FROM products
               WHERE name LIKE %s OR category LIKE %s""",
            (pattern, pattern)
        )
        return cursor.fetchall()


# --- Demo ---
if __name__ == "__main__":
    init_inventory()

    # Add products
    add_product("Mechanical Keyboard", "KB-001", 89.99, 50, "Electronics")
    add_product("USB-C Cable", "CB-001", 12.99, 200, "Accessories")
    add_product("Monitor Stand", "MS-001", 45.00, 15, "Furniture")
    add_product("Webcam HD", "WC-001", 59.99, 8, "Electronics")

    # Sell some items
    sell("KB-001", 5)
    restock("WC-001", 20)

    # Reports
    print("\nLow stock items:")
    for item in get_low_stock(threshold=20):
        print(f"  {item['name']} (SKU: {item['sku']}): {item['quantity']} left")

    print(f"\nTotal inventory value: ${get_inventory_value():,.2f}")

    print("\nElectronics products:")
    for p in search_products("Electronics"):
        print(f"  {p['name']} - ${p['price']} ({p['quantity']} in stock)")

Example 3: Simple Data Access Layer

A reusable data access layer that any application can build on — similar to a repository pattern used in web frameworks:

import mysql.connector
from mysql.connector import pooling, Error
from contextlib import contextmanager


class DataAccessLayer:
    """A generic, reusable data access layer for MySQL."""

    def __init__(self, host, user, password, database, pool_size=5):
        self.pool = pooling.MySQLConnectionPool(
            pool_name="dal_pool",
            pool_size=pool_size,
            host=host,
            user=user,
            password=password,
            database=database
        )

    @contextmanager
    def _get_cursor(self):
        conn = self.pool.get_connection()
        cursor = conn.cursor(dictionary=True)
        try:
            yield cursor, conn
        finally:
            cursor.close()
            conn.close()

    def fetch_all(self, query, params=None):
        """Execute a SELECT and return all rows."""
        with self._get_cursor() as (cursor, conn):
            cursor.execute(query, params)
            return cursor.fetchall()

    def fetch_one(self, query, params=None):
        """Execute a SELECT and return the first row."""
        with self._get_cursor() as (cursor, conn):
            cursor.execute(query, params)
            return cursor.fetchone()

    def execute(self, query, params=None):
        """Execute INSERT, UPDATE, or DELETE. Returns affected row count."""
        with self._get_cursor() as (cursor, conn):
            cursor.execute(query, params)
            conn.commit()
            return cursor.rowcount

    def insert(self, query, params=None):
        """Execute an INSERT and return the new row's ID."""
        with self._get_cursor() as (cursor, conn):
            cursor.execute(query, params)
            conn.commit()
            return cursor.lastrowid

    def execute_many(self, query, params_list):
        """Execute a batch operation. Returns affected row count."""
        with self._get_cursor() as (cursor, conn):
            cursor.executemany(query, params_list)
            conn.commit()
            return cursor.rowcount

    def execute_transaction(self, operations):
        """
        Execute multiple operations in a single transaction.
        operations: list of (query, params) tuples
        """
        with self._get_cursor() as (cursor, conn):
            try:
                for query, params in operations:
                    cursor.execute(query, params)
                conn.commit()
                return True
            except Error:
                conn.rollback()
                raise


# --- Usage Example ---
dal = DataAccessLayer(
    host="127.0.0.1",
    user="root",
    password="rootpass",
    database="tutorial_db"
)

# Insert
user_id = dal.insert(
    "INSERT INTO users (username, email, age) VALUES (%s, %s, %s)",
    ("ivy", "ivy@example.com", 26)
)

# Read
users = dal.fetch_all("SELECT * FROM users WHERE age > %s", (25,))
for user in users:
    print(user)

# Update
affected = dal.execute(
    "UPDATE users SET age = %s WHERE username = %s",
    (27, "ivy")
)

# Transaction
dal.execute_transaction([
    ("UPDATE users SET age = age - 1 WHERE username = %s", ("alice",)),
    ("UPDATE users SET age = age + 1 WHERE username = %s", ("bob",)),
])

Common Pitfalls

These are the mistakes that burn developers most often. Learn them here so you do not learn them in a production outage.

1. SQL Injection

We covered this above, but it bears repeating. Never build SQL strings with user input. Always use parameterized queries. This is the number-one security vulnerability in web applications, and it is completely preventable.

2. Forgetting to Commit

If your INSERTs and UPDATEs seem to work but the data disappears, you forgot to call conn.commit(). The default mode is manual commit — every write must be explicitly committed.

# This does NOTHING to the database without commit()
cursor.execute(
    "INSERT INTO users (username, email) VALUES (%s, %s)",
    ("ghost", "ghost@example.com")
)
# conn.commit()  <-- Missing! Data is lost when connection closes.

3. Connection Leaks

If you open connections without closing them, your application eventually exhausts the MySQL connection limit (default: 151). Use context managers or try/finally blocks to guarantee cleanup:

# BAD — if an exception occurs, connection is never closed
conn = mysql.connector.connect(**config)
cursor = conn.cursor()
cursor.execute("SELECT * FROM users")
# ... exception here means conn.close() never runs
conn.close()

# GOOD — finally block guarantees cleanup
conn = mysql.connector.connect(**config)
try:
    cursor = conn.cursor()
    cursor.execute("SELECT * FROM users")
    results = cursor.fetchall()
finally:
    conn.close()

4. N+1 Query Problem

This is especially common with ORMs. If you load a list of users, then loop through them loading each user's posts individually, you make 1 + N queries instead of a single JOIN:

# BAD — N+1 queries
users = session.query(User).all()         # 1 query
for user in users:
    print(user.posts)                      # N queries (1 per user)

# GOOD — eager loading with joinedload
from sqlalchemy.orm import joinedload
users = (
    session.query(User)
    .options(joinedload(User.posts))
    .all()
)  # 1 query
for user in users:
    print(user.posts)                      # No additional queries

5. Not Handling Exceptions

Database operations can fail for many reasons: deadlocks, timeouts, constraint violations, server restarts. Always wrap database calls in try/except and handle failures gracefully.

6. Storing Passwords in Plain Text

Never store raw passwords. Always hash them with a salt. Use bcrypt or argon2 in production — our example used SHA-256 for simplicity, but dedicated password hashing libraries are much more secure.

Best Practices

  1. Always use parameterized queries — No exceptions. Not even for "internal" tools. Build the habit so strong that string-concatenated SQL feels physically wrong.
  2. Use connection pooling — If your application handles more than a handful of requests, pool your connections. It is a one-time setup that pays dividends forever.
  3. Handle exceptions properly — Catch mysql.connector.Error, log the details, and fail gracefully. Do not let raw database errors leak to your users.
  4. Close connections and cursors — Use context managers. Every connection and cursor should have a guaranteed cleanup path.
  5. Use transactions for related operations — If two or more statements must succeed together, wrap them in a transaction. Partial updates corrupt data.
  6. Validate and sanitize inputs — Parameterized queries prevent injection, but you should still validate data types, lengths, and formats before they hit the database.
  7. Index your columns — If you query a column in WHERE, JOIN, or ORDER BY clauses, make sure it has an index. Unindexed queries on large tables are the most common performance problem.
  8. Log slow queries — Enable MySQL slow query log and review it regularly. Most performance issues are fixable with proper indexing or query restructuring.
  9. Use environment variables for credentials — Never hard-code database passwords in source code. Use os.environ or a secrets manager.
  10. Test with realistic data volumes — A query that runs in 1ms on 100 rows might take 30 seconds on 1 million rows. Test with production-scale data before deploying.

Key Takeaways

  • mysql-connector-python is the standard driver for Python-MySQL integration, following the DB-API 2.0 spec.
  • The core workflow is: connect, cursor, execute, fetch/commit, close.
  • Parameterized queries (%s placeholders) are mandatory — they prevent SQL injection and should be your default.
  • Transactions (commit() / rollback()) ensure data consistency for multi-statement operations.
  • Connection pooling is essential for any application that handles concurrent requests.
  • Context managers eliminate resource leaks and make your code cleaner and safer.
  • SQLAlchemy ORM provides a higher-level abstraction for complex applications — use it for CRUD-heavy code, raw SQL for analytics.
  • The most common mistakes — SQL injection, forgotten commits, connection leaks — are all preventable with disciplined patterns.
  • Start with the basics, use context managers from day one, add connection pooling when you scale, and reach for SQLAlchemy when your data model gets complex.

With these patterns and practices in your toolkit, you can confidently build Python applications backed by MySQL — from quick scripts to production web services.




Subscribe To Our Newsletter
You will receive our latest post and tutorial.
Thank you for subscribing!

required
required


Leave a Reply

Your email address will not be published. Required fields are marked *