Chapter 5: locking patterns in Postgres via SQLAlchemy. Pessimistic locking, advisory locks, and the SKIP LOCKED pattern that makes Postgres a queue.
SELECT FOR UPDATE
account = await session.scalar(
select(Account).where(Account.id == 1).with_for_update()
)
account.balance -= 100
await session.commit()
with_for_update() adds FOR UPDATE. Locks the row until commit.
NOWAIT
account = await session.scalar(
select(Account).where(Account.id == 1).with_for_update(nowait=True)
)
Errors immediately if locked, instead of waiting.
For interactive UIs: surface “someone else is editing” rather than hanging.
SKIP LOCKED
job = await session.scalar(
select(Job)
.where(Job.status == "pending")
.order_by(Job.created_at)
.limit(1)
.with_for_update(skip_locked=True)
)
if job:
job.status = "processing"
await session.commit()
await process(job)
Multiple workers, no blocking. Each grabs an unlocked row.
The basis of Postgres-as-queue .
OF (specific tables)
.with_for_update(of=Account)
In a join, lock only the Account rows; not joined rows.
FOR SHARE
.with_for_update(read=True) # FOR SHARE
Multiple readers can hold; blocks writers. Less common.
Advisory locks
Application-level locking that doesn’t tie to a row:
# Acquire (transaction-scoped)
await session.execute(text("SELECT pg_advisory_xact_lock(:key)"), {"key": 12345})
# ... critical section ...
await session.commit() # auto-releases
For session-scoped (across transactions):
await session.execute(text("SELECT pg_advisory_lock(:key)"), {"key": 12345})
# ...
await session.execute(text("SELECT pg_advisory_unlock(:key)"), {"key": 12345})
Use cases:
- Cron jobs: only one runs at a time.
- Initialization: only one instance bootstraps.
- Per-entity locking without row state.
Two-int advisory locks
await session.execute(text("SELECT pg_advisory_xact_lock(:k1, :k2)"), {"k1": 1, "k2": user_id})
Two 32-bit ints; useful for namespaced locks (k1 = lock type, k2 = entity id).
try_advisory_lock
acquired = await session.scalar(text("SELECT pg_try_advisory_xact_lock(:key)"), {"key": 12345})
if not acquired:
raise BusyError
Non-blocking. Returns true if acquired.
Common patterns
Singleton job
async def run_singleton():
async with AsyncSessionLocal() as session:
async with session.begin():
await session.execute(text("SELECT pg_advisory_xact_lock(42)"))
await do_work()
Only one instance acquires; others wait.
Per-user serialization
async def update_user_state(user_id: int, change: dict):
async with AsyncSessionLocal() as session:
async with session.begin():
await session.execute(text("SELECT pg_advisory_xact_lock(1, :uid)"), {"uid": user_id})
user = await session.get(User, user_id)
apply_change(user, change)
Two updates to the same user serialize; updates to different users parallelize.
Deadlock detection
Postgres detects deadlocks automatically; aborts one transaction with 40P01.
from psycopg2.errors import DeadlockDetected as Pg2Deadlock
from sqlalchemy.exc import DBAPIError
async def with_retry(fn, max_attempts=3):
for attempt in range(max_attempts):
try:
return await fn()
except DBAPIError as e:
sqlstate = getattr(e.orig, "sqlstate", None) or getattr(e.orig, "pgcode", None)
if sqlstate != "40P01" or attempt == max_attempts - 1:
raise
await asyncio.sleep(0.1 * (2 ** attempt))
40P01 = serialization failure / deadlock.
For asyncpg: e.orig.sqlstate.
Avoiding deadlocks
Acquire locks in a consistent order:
# BAD
def transfer(a, b, amt):
lock(a); lock(b) # may conflict with reverse-order callers
# GOOD
def transfer(a, b, amt):
first, second = sorted([a, b], key=lambda x: x.id)
lock(first); lock(second)
Same global ordering = no deadlock.
Visibility timeout for queue
async def claim_job(worker_id):
async with session.begin():
job = await session.scalar(
select(Job)
.where(Job.locked_at < datetime.utcnow() - timedelta(minutes=5))
.order_by(Job.created_at)
.limit(1)
.with_for_update(skip_locked=True)
)
if job:
job.locked_at = datetime.utcnow()
job.locked_by = worker_id
return job
Reaper for stuck jobs. See Postgres as a Queue .
ROW-level vs page-level
Postgres locks at row level (no page-level upgrade like SQL Server). Predictable.
Lock timeout
await session.execute(text("SET lock_timeout = '5s'"))
account = await session.scalar(select(Account).with_for_update())
# raises after 5s if locked
Like NOWAIT but with a timeout. Use SET LOCAL inside transactions.
Common mistakes
1. Long-held locks
External calls during a lock — blast radius. Acquire late, release early.
2. No deadlock retry
40P01 propagates as 500. Add retry.
3. Mixing NOWAIT and SKIP LOCKED
Different semantics. NOWAIT errors; SKIP LOCKED silently skips. Pick by intent.
4. Using row locks for app-level mutex
Locking a row to prevent concurrent operation. Better: advisory lock.
5. Forgetting to release session-scoped advisory locks
Connection returns to pool; lock outlives. Use transaction-scoped (pg_advisory_xact_lock) when possible.
What’s next
Chapter 6: LISTEN / NOTIFY.
Read this next
Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .