Chapter 7: transactions, isolation, locking, retry. The patterns that keep concurrent writes correct.
Implicit transaction per session
A session begins a transaction lazily on first write or query. commit() ends; new ops start a new one.
async with AsyncSessionLocal() as session:
user = User(...)
session.add(user)
await session.commit() # tx commits
# next op starts a new tx
await session.get(User, 1)
Explicit transaction blocks
async with AsyncSessionLocal() as session:
async with session.begin():
session.add(user1)
session.add(user2)
# auto-commit on context exit; rollback on exception
session.begin() opens a transaction that auto-commits.
Savepoints (nested)
async with session.begin():
session.add(parent)
async with session.begin_nested() as sp:
session.add(child1)
if oh_no:
await sp.rollback() # roll back only the child
# parent still pending
session.add(child2)
# parent + child2 committed; child1 not
Savepoints (SAVEPOINT) are supported in Postgres, MySQL InnoDB, SQLite. Useful for “try this; revert if it fails” within a larger transaction.
Isolation levels
| Level | Postgres | MySQL | SQLite |
|---|---|---|---|
| READ UNCOMMITTED | n/a (= READ COMMITTED) | yes | always serialized |
| READ COMMITTED (default for PG) | yes | yes | n/a |
| REPEATABLE READ (default for MySQL) | yes | yes | n/a |
| SERIALIZABLE | yes | yes | only mode |
Set per session:
session.connection(execution_options={"isolation_level": "REPEATABLE READ"})
Or per engine:
engine = create_engine(URL, isolation_level="REPEATABLE READ")
For most apps: default (READ COMMITTED for PG, REPEATABLE READ for MySQL) is fine.
with_for_update (pessimistic locking)
account = await session.scalar(select(Account).where(Account.id == 1).with_for_update())
account.balance -= 100
await session.commit()
Locks the row until commit/rollback. Other transactions wait.
Postgres: SELECT ... FOR UPDATE.
MySQL: SELECT ... FOR UPDATE.
SQLite: serialization (no per-row lock).
For “no wait if locked”:
.with_for_update(nowait=True)
# raises if locked
For “skip if locked”:
.with_for_update(skip_locked=True)
# returns no row instead of waiting
skip_locked is the basis of Postgres-as-queue
.
Optimistic locking
Use a version column:
class Post(Base):
id: Mapped[int] = mapped_column(primary_key=True)
title: Mapped[str]
version: Mapped[int] = mapped_column(default=0)
__mapper_args__ = {"version_id_col": "version"}
SQLAlchemy adds WHERE version = ? to UPDATE. If 0 rows affected (someone else updated): StaleDataError.
try:
post.title = "new"
await session.commit()
except StaleDataError:
# another writer beat us; retry or surface conflict
...
For high-concurrency, low-contention writes: optimistic is cheap.
Deadlock retry
Two transactions hold locks the other wants:
import asyncio
from sqlalchemy.exc import DBAPIError
async def with_retry(fn, max_attempts=3):
for attempt in range(max_attempts):
try:
return await fn()
except DBAPIError as e:
if "deadlock" not in str(e).lower():
raise
if attempt == max_attempts - 1:
raise
await asyncio.sleep(0.1 * (2 ** attempt) + random.uniform(0, 0.1))
For production: a more robust check based on driver-specific error codes.
Distributed transactions
Two-phase commit across multiple resources:
session1.connection(execution_options={"isolation_level": "READ COMMITTED"})
session2.connection(...)
async with session1.begin_twophase() as tx1:
async with session2.begin_twophase() as tx2:
...
Rarely worth it. Almost always: use sagas / outbox instead. See Saga and Distributed Transactions .
Read-only transactions
session.connection(execution_options={"postgresql_readonly": True})
Hints to Postgres for query planning. Works on read replicas.
Connection isolation per request
For request-scoped isolation level changes:
async def get_db_serializable():
async with AsyncSessionLocal() as session:
await session.connection(execution_options={"isolation_level": "SERIALIZABLE"})
yield session
Use for endpoints that need serializable semantics; default elsewhere.
Auto-commit mode
You can force auto-commit (each statement in its own tx):
session.connection(execution_options={"isolation_level": "AUTOCOMMIT"})
For DDL or maintenance ops where transactions aren’t desired (e.g., VACUUM in Postgres).
SELECT FOR UPDATE OF
select(Account).where(...).with_for_update(of=Account)
Locks specific tables in joins. Postgres-specific; useful when joining read-only with write tables.
Transaction events
@event.listens_for(Session, "after_commit")
def on_commit(session): ...
@event.listens_for(Session, "after_rollback")
def on_rollback(session): ...
Hooks for audit, metrics.
Cross-DB locking notes
| Postgres | MySQL | SQLite | |
|---|---|---|---|
| Row-level lock | yes (FOR UPDATE) | yes (InnoDB) | no (serializes) |
| Skip locked | yes | yes (8.0+) | n/a |
| Advisory locks | yes | yes (GET_LOCK) | no |
| Deadlock detection | yes | yes | n/a |
For Postgres-specific advisory locks, see the Postgres textbook .
Common mistakes
1. Long-held locks
account = await session.scalar(select(Account).with_for_update())
await external_api_call() # row locked while waiting on HTTP
account.balance -= 100
await session.commit()
External calls during a lock = blast radius. Acquire late, release early.
2. No retry on deadlock
Deadlock raises; user sees 500. Add retry.
3. Wrong isolation
Reading-then-writing without FOR UPDATE in READ COMMITTED → race.
4. Optimistic without retry
StaleDataError propagates as 500. Retry or surface as 409 Conflict.
5. Mixing levels in one session
Switching mid-transaction is messy. Set isolation at the start.
What’s next
Chapter 8: Async SQLAlchemy in depth.
Read this next
Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .