Cheatsheet for transactions. Long-form: Textbook Ch 7 .
Implicit transaction
async with AsyncSessionLocal() as session:
user = User(...)
session.add(user)
await session.commit() # tx commits
await session.get(User, 1) # new tx starts
Explicit begin
async with AsyncSessionLocal() as session:
async with session.begin():
session.add(user1)
session.add(user2)
# auto-commit on exit; rollback on exception
Savepoints
async with session.begin():
session.add(parent)
async with session.begin_nested() as sp:
session.add(child1)
if oh_no:
await sp.rollback()
session.add(child2)
# parent + child2 committed; child1 not
Isolation levels
# Per engine
engine = create_async_engine(URL, isolation_level="REPEATABLE READ")
# Per connection
await session.connection(execution_options={"isolation_level": "SERIALIZABLE"})
| Level | PG | MySQL | SQLite |
|---|---|---|---|
| READ COMMITTED | default | yes | n/a |
| REPEATABLE READ | yes | default | n/a |
| SERIALIZABLE | yes | yes | always |
with_for_update (pessimistic locking)
account = await session.scalar(
select(Account).where(Account.id == 1).with_for_update()
)
account.balance -= 100
await session.commit()
NOWAIT / SKIP LOCKED
# Error if locked
.with_for_update(nowait=True)
# Skip locked rows (queue pattern)
job = await session.scalar(
select(Job)
.where(Job.status == "pending")
.order_by(Job.created_at)
.limit(1)
.with_for_update(skip_locked=True)
)
FOR SHARE
.with_for_update(read=True) # FOR SHARE; blocks writers only
Lock specific tables in joins
.with_for_update(of=Account) # only lock Account rows
Optimistic locking (version column)
class Post(Base):
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str]
version: Mapped[int] = mapped_column(default=0)
__mapper_args__ = {"version_id_col": "version"}
# SQLAlchemy adds WHERE version = ? to UPDATE
# StaleDataError if no rows match
Deadlock retry
import asyncio
from sqlalchemy.exc import DBAPIError
async def with_retry(fn, max_attempts=3):
for attempt in range(max_attempts):
try:
return await fn()
except DBAPIError as e:
sqlstate = getattr(e.orig, "sqlstate", None) or getattr(e.orig, "pgcode", None)
if sqlstate != "40P01" or attempt == max_attempts - 1: # 40P01 = deadlock (PG)
raise
await asyncio.sleep(0.1 * (2 ** attempt))
Read-only transactions (Postgres)
await session.connection(execution_options={"postgresql_readonly": True})
Auto-commit (for DDL)
await session.connection(execution_options={"isolation_level": "AUTOCOMMIT"})
Transaction events
@event.listens_for(Session, "after_commit")
def on_commit(session): ...
@event.listens_for(Session, "after_rollback")
def on_rollback(session): ...
Cross-DB locking
| Feature | PG | MySQL InnoDB | SQLite |
|---|---|---|---|
| FOR UPDATE row-lock | yes | yes | no (serialized) |
| SKIP LOCKED | yes | 8.0+ | n/a |
| Advisory locks | yes (pg_advisory_*) | GET_LOCK | no |
| Deadlock detection | yes | yes | n/a |
For Postgres advisory locks, see Postgres cheatsheets .
Common mistakes
- Holding
FOR UPDATElock during external API calls — blast radius. - Reading without lock then writing — race.
- No deadlock retry — 500s on contention.
- Optimistic without retry —
StaleDataErrorpropagates as 500.
Read this next
If you want my deadlock-retry + optimistic-lock decorator, it’s at rajpoot.dev .
Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .