Chapter 5: locking patterns in Postgres via SQLAlchemy. Pessimistic locking, advisory locks, and the SKIP LOCKED pattern that makes Postgres a queue.

SELECT FOR UPDATE

account = await session.scalar(
    select(Account).where(Account.id == 1).with_for_update()
)
account.balance -= 100
await session.commit()

with_for_update() adds FOR UPDATE. Locks the row until commit.

NOWAIT

account = await session.scalar(
    select(Account).where(Account.id == 1).with_for_update(nowait=True)
)

Errors immediately if locked, instead of waiting.

For interactive UIs: surface “someone else is editing” rather than hanging.

SKIP LOCKED

job = await session.scalar(
    select(Job)
    .where(Job.status == "pending")
    .order_by(Job.created_at)
    .limit(1)
    .with_for_update(skip_locked=True)
)
if job:
    job.status = "processing"
    await session.commit()
    await process(job)

Multiple workers, no blocking. Each grabs an unlocked row.

The basis of Postgres-as-queue .

OF (specific tables)

.with_for_update(of=Account)

In a join, lock only the Account rows; not joined rows.

FOR SHARE

.with_for_update(read=True)  # FOR SHARE

Multiple readers can hold; blocks writers. Less common.

Advisory locks

Application-level locking that doesn’t tie to a row:

# Acquire (transaction-scoped)
await session.execute(text("SELECT pg_advisory_xact_lock(:key)"), {"key": 12345})

# ... critical section ...

await session.commit()  # auto-releases

For session-scoped (across transactions):

await session.execute(text("SELECT pg_advisory_lock(:key)"), {"key": 12345})
# ...
await session.execute(text("SELECT pg_advisory_unlock(:key)"), {"key": 12345})

Use cases:

  • Cron jobs: only one runs at a time.
  • Initialization: only one instance bootstraps.
  • Per-entity locking without row state.

Two-int advisory locks

await session.execute(text("SELECT pg_advisory_xact_lock(:k1, :k2)"), {"k1": 1, "k2": user_id})

Two 32-bit ints; useful for namespaced locks (k1 = lock type, k2 = entity id).

try_advisory_lock

acquired = await session.scalar(text("SELECT pg_try_advisory_xact_lock(:key)"), {"key": 12345})
if not acquired:
    raise BusyError

Non-blocking. Returns true if acquired.

Common patterns

Singleton job

async def run_singleton():
    async with AsyncSessionLocal() as session:
        async with session.begin():
            await session.execute(text("SELECT pg_advisory_xact_lock(42)"))
            await do_work()

Only one instance acquires; others wait.

Per-user serialization

async def update_user_state(user_id: int, change: dict):
    async with AsyncSessionLocal() as session:
        async with session.begin():
            await session.execute(text("SELECT pg_advisory_xact_lock(1, :uid)"), {"uid": user_id})
            user = await session.get(User, user_id)
            apply_change(user, change)

Two updates to the same user serialize; updates to different users parallelize.

Deadlock detection

Postgres detects deadlocks automatically; aborts one transaction with 40P01.

from psycopg2.errors import DeadlockDetected as Pg2Deadlock
from sqlalchemy.exc import DBAPIError

async def with_retry(fn, max_attempts=3):
    for attempt in range(max_attempts):
        try:
            return await fn()
        except DBAPIError as e:
            sqlstate = getattr(e.orig, "sqlstate", None) or getattr(e.orig, "pgcode", None)
            if sqlstate != "40P01" or attempt == max_attempts - 1:
                raise
            await asyncio.sleep(0.1 * (2 ** attempt))

40P01 = serialization failure / deadlock.

For asyncpg: e.orig.sqlstate.

Avoiding deadlocks

Acquire locks in a consistent order:

# BAD
def transfer(a, b, amt):
    lock(a); lock(b)  # may conflict with reverse-order callers

# GOOD
def transfer(a, b, amt):
    first, second = sorted([a, b], key=lambda x: x.id)
    lock(first); lock(second)

Same global ordering = no deadlock.

Visibility timeout for queue

async def claim_job(worker_id):
    async with session.begin():
        job = await session.scalar(
            select(Job)
            .where(Job.locked_at < datetime.utcnow() - timedelta(minutes=5))
            .order_by(Job.created_at)
            .limit(1)
            .with_for_update(skip_locked=True)
        )
        if job:
            job.locked_at = datetime.utcnow()
            job.locked_by = worker_id
        return job

Reaper for stuck jobs. See Postgres as a Queue .

ROW-level vs page-level

Postgres locks at row level (no page-level upgrade like SQL Server). Predictable.

Lock timeout

await session.execute(text("SET lock_timeout = '5s'"))
account = await session.scalar(select(Account).with_for_update())
# raises after 5s if locked

Like NOWAIT but with a timeout. Use SET LOCAL inside transactions.

Common mistakes

1. Long-held locks

External calls during a lock — blast radius. Acquire late, release early.

2. No deadlock retry

40P01 propagates as 500. Add retry.

3. Mixing NOWAIT and SKIP LOCKED

Different semantics. NOWAIT errors; SKIP LOCKED silently skips. Pick by intent.

4. Using row locks for app-level mutex

Locking a row to prevent concurrent operation. Better: advisory lock.

5. Forgetting to release session-scoped advisory locks

Connection returns to pool; lock outlives. Use transaction-scoped (pg_advisory_xact_lock) when possible.

What’s next

Chapter 6: LISTEN / NOTIFY.

Read this next


Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .