Chapter 7: transactions, isolation, locking, retry. The patterns that keep concurrent writes correct.

Implicit transaction per session

A session begins a transaction lazily on first write or query. commit() ends; new ops start a new one.

async with AsyncSessionLocal() as session:
    user = User(...)
    session.add(user)
    await session.commit()  # tx commits
    
    # next op starts a new tx
    await session.get(User, 1)

Explicit transaction blocks

async with AsyncSessionLocal() as session:
    async with session.begin():
        session.add(user1)
        session.add(user2)
    # auto-commit on context exit; rollback on exception

session.begin() opens a transaction that auto-commits.

Savepoints (nested)

async with session.begin():
    session.add(parent)
    
    async with session.begin_nested() as sp:
        session.add(child1)
        if oh_no:
            await sp.rollback()  # roll back only the child
            # parent still pending
    
    session.add(child2)
# parent + child2 committed; child1 not

Savepoints (SAVEPOINT) are supported in Postgres, MySQL InnoDB, SQLite. Useful for “try this; revert if it fails” within a larger transaction.

Isolation levels

LevelPostgresMySQLSQLite
READ UNCOMMITTEDn/a (= READ COMMITTED)yesalways serialized
READ COMMITTED (default for PG)yesyesn/a
REPEATABLE READ (default for MySQL)yesyesn/a
SERIALIZABLEyesyesonly mode

Set per session:

session.connection(execution_options={"isolation_level": "REPEATABLE READ"})

Or per engine:

engine = create_engine(URL, isolation_level="REPEATABLE READ")

For most apps: default (READ COMMITTED for PG, REPEATABLE READ for MySQL) is fine.

with_for_update (pessimistic locking)

account = await session.scalar(select(Account).where(Account.id == 1).with_for_update())
account.balance -= 100
await session.commit()

Locks the row until commit/rollback. Other transactions wait.

Postgres: SELECT ... FOR UPDATE. MySQL: SELECT ... FOR UPDATE. SQLite: serialization (no per-row lock).

For “no wait if locked”:

.with_for_update(nowait=True)
# raises if locked

For “skip if locked”:

.with_for_update(skip_locked=True)
# returns no row instead of waiting

skip_locked is the basis of Postgres-as-queue .

Optimistic locking

Use a version column:

class Post(Base):
    id: Mapped[int] = mapped_column(primary_key=True)
    title: Mapped[str]
    version: Mapped[int] = mapped_column(default=0)
    
    __mapper_args__ = {"version_id_col": "version"}

SQLAlchemy adds WHERE version = ? to UPDATE. If 0 rows affected (someone else updated): StaleDataError.

try:
    post.title = "new"
    await session.commit()
except StaleDataError:
    # another writer beat us; retry or surface conflict
    ...

For high-concurrency, low-contention writes: optimistic is cheap.

Deadlock retry

Two transactions hold locks the other wants:

import asyncio
from sqlalchemy.exc import DBAPIError

async def with_retry(fn, max_attempts=3):
    for attempt in range(max_attempts):
        try:
            return await fn()
        except DBAPIError as e:
            if "deadlock" not in str(e).lower():
                raise
            if attempt == max_attempts - 1:
                raise
            await asyncio.sleep(0.1 * (2 ** attempt) + random.uniform(0, 0.1))

For production: a more robust check based on driver-specific error codes.

Distributed transactions

Two-phase commit across multiple resources:

session1.connection(execution_options={"isolation_level": "READ COMMITTED"})
session2.connection(...)
async with session1.begin_twophase() as tx1:
    async with session2.begin_twophase() as tx2:
        ...

Rarely worth it. Almost always: use sagas / outbox instead. See Saga and Distributed Transactions .

Read-only transactions

session.connection(execution_options={"postgresql_readonly": True})

Hints to Postgres for query planning. Works on read replicas.

Connection isolation per request

For request-scoped isolation level changes:

async def get_db_serializable():
    async with AsyncSessionLocal() as session:
        await session.connection(execution_options={"isolation_level": "SERIALIZABLE"})
        yield session

Use for endpoints that need serializable semantics; default elsewhere.

Auto-commit mode

You can force auto-commit (each statement in its own tx):

session.connection(execution_options={"isolation_level": "AUTOCOMMIT"})

For DDL or maintenance ops where transactions aren’t desired (e.g., VACUUM in Postgres).

SELECT FOR UPDATE OF

select(Account).where(...).with_for_update(of=Account)

Locks specific tables in joins. Postgres-specific; useful when joining read-only with write tables.

Transaction events

@event.listens_for(Session, "after_commit")
def on_commit(session): ...

@event.listens_for(Session, "after_rollback")
def on_rollback(session): ...

Hooks for audit, metrics.

Cross-DB locking notes

PostgresMySQLSQLite
Row-level lockyes (FOR UPDATE)yes (InnoDB)no (serializes)
Skip lockedyesyes (8.0+)n/a
Advisory locksyesyes (GET_LOCK)no
Deadlock detectionyesyesn/a

For Postgres-specific advisory locks, see the Postgres textbook .

Common mistakes

1. Long-held locks

account = await session.scalar(select(Account).with_for_update())
await external_api_call()  # row locked while waiting on HTTP
account.balance -= 100
await session.commit()

External calls during a lock = blast radius. Acquire late, release early.

2. No retry on deadlock

Deadlock raises; user sees 500. Add retry.

3. Wrong isolation

Reading-then-writing without FOR UPDATE in READ COMMITTED → race.

4. Optimistic without retry

StaleDataError propagates as 500. Retry or surface as 409 Conflict.

5. Mixing levels in one session

Switching mid-transaction is messy. Set isolation at the start.

What’s next

Chapter 8: Async SQLAlchemy in depth.

Read this next


Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .