Cheatsheet for transactions. Long-form: Textbook Ch 7 .

Implicit transaction

async with AsyncSessionLocal() as session:
    user = User(...)
    session.add(user)
    await session.commit()    # tx commits
    
    await session.get(User, 1)  # new tx starts

Explicit begin

async with AsyncSessionLocal() as session:
    async with session.begin():
        session.add(user1)
        session.add(user2)
    # auto-commit on exit; rollback on exception

Savepoints

async with session.begin():
    session.add(parent)
    
    async with session.begin_nested() as sp:
        session.add(child1)
        if oh_no:
            await sp.rollback()
    
    session.add(child2)
# parent + child2 committed; child1 not

Isolation levels

# Per engine
engine = create_async_engine(URL, isolation_level="REPEATABLE READ")

# Per connection
await session.connection(execution_options={"isolation_level": "SERIALIZABLE"})
LevelPGMySQLSQLite
READ COMMITTEDdefaultyesn/a
REPEATABLE READyesdefaultn/a
SERIALIZABLEyesyesalways

with_for_update (pessimistic locking)

account = await session.scalar(
    select(Account).where(Account.id == 1).with_for_update()
)
account.balance -= 100
await session.commit()

NOWAIT / SKIP LOCKED

# Error if locked
.with_for_update(nowait=True)

# Skip locked rows (queue pattern)
job = await session.scalar(
    select(Job)
    .where(Job.status == "pending")
    .order_by(Job.created_at)
    .limit(1)
    .with_for_update(skip_locked=True)
)

FOR SHARE

.with_for_update(read=True)   # FOR SHARE; blocks writers only

Lock specific tables in joins

.with_for_update(of=Account)   # only lock Account rows

Optimistic locking (version column)

class Post(Base):
    id: Mapped[int] = mapped_column(primary_key=True)
    title: Mapped[str]
    version: Mapped[int] = mapped_column(default=0)
    
    __mapper_args__ = {"version_id_col": "version"}

# SQLAlchemy adds WHERE version = ? to UPDATE
# StaleDataError if no rows match

Deadlock retry

import asyncio
from sqlalchemy.exc import DBAPIError

async def with_retry(fn, max_attempts=3):
    for attempt in range(max_attempts):
        try:
            return await fn()
        except DBAPIError as e:
            sqlstate = getattr(e.orig, "sqlstate", None) or getattr(e.orig, "pgcode", None)
            if sqlstate != "40P01" or attempt == max_attempts - 1:   # 40P01 = deadlock (PG)
                raise
            await asyncio.sleep(0.1 * (2 ** attempt))

Read-only transactions (Postgres)

await session.connection(execution_options={"postgresql_readonly": True})

Auto-commit (for DDL)

await session.connection(execution_options={"isolation_level": "AUTOCOMMIT"})

Transaction events

@event.listens_for(Session, "after_commit")
def on_commit(session): ...

@event.listens_for(Session, "after_rollback")
def on_rollback(session): ...

Cross-DB locking

FeaturePGMySQL InnoDBSQLite
FOR UPDATE row-lockyesyesno (serialized)
SKIP LOCKEDyes8.0+n/a
Advisory locksyes (pg_advisory_*)GET_LOCKno
Deadlock detectionyesyesn/a

For Postgres advisory locks, see Postgres cheatsheets .

Common mistakes

  • Holding FOR UPDATE lock during external API calls — blast radius.
  • Reading without lock then writing — race.
  • No deadlock retry — 500s on contention.
  • Optimistic without retry — StaleDataError propagates as 500.

Read this next

If you want my deadlock-retry + optimistic-lock decorator, it’s at rajpoot.dev .


Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .