Cheatsheet for emitting changes reliably to downstream consumers (search indexes, search caches, event buses).
Outbox pattern (atomic with business write)
class OutboxEvent(Base):
__tablename__ = "outbox_events"
id: Mapped[int] = mapped_column(primary_key=True)
aggregate_type: Mapped[str]
aggregate_id: Mapped[str]
event_type: Mapped[str]
payload: Mapped[dict] = mapped_column(JSONB)
occurred_at: Mapped[datetime] = mapped_column(server_default=func.now())
published_at: Mapped[datetime | None] = mapped_column(index=True, default=None)
__table_args__ = (
Index("ix_outbox_unpublished", "occurred_at", postgresql_where=text("published_at IS NULL")),
)
Emit in the same transaction
async def create_post(session, author_id, title, body):
post = Post(author_id=author_id, title=title, body=body)
session.add(post)
await session.flush()
session.add(OutboxEvent(
aggregate_type="Post",
aggregate_id=str(post.id),
event_type="post.created",
payload={"id": post.id, "author_id": author_id, "title": title},
))
await session.commit()
return post
Either both rows commit or neither.
Worker drain (SKIP LOCKED)
async def drain_outbox():
async with AsyncSessionLocal() as session:
async with session.begin():
events = (await session.execute(
select(OutboxEvent)
.where(OutboxEvent.published_at == None)
.order_by(OutboxEvent.occurred_at)
.limit(100)
.with_for_update(skip_locked=True)
)).scalars().all()
for e in events:
try:
await bus.publish(e.event_type, e.payload)
e.published_at = datetime.utcnow()
except Exception as ex:
log.exception("publish_failed", event_id=e.id)
# don't update published_at; will retry next pass
SKIP LOCKED lets multiple workers run in parallel without conflict.
Loop with LISTEN/NOTIFY (low-latency wake)
NOTIFY_TRIGGER = """
CREATE OR REPLACE FUNCTION notify_outbox() RETURNS TRIGGER AS $$
BEGIN
PERFORM pg_notify('outbox', '');
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER outbox_notify
AFTER INSERT ON outbox_events
FOR EACH ROW EXECUTE FUNCTION notify_outbox();
"""
# Worker
async def outbox_worker():
conn = await asyncpg.connect(DATABASE_URL)
await conn.add_listener("outbox", lambda *_: asyncio.create_task(drain_outbox()))
while True:
await asyncio.sleep(60) # also drain periodically as safety
Inbox / dedup at consumer
class InboxEvent(Base):
__tablename__ = "inbox_events"
event_id: Mapped[str] = mapped_column(primary_key=True)
received_at: Mapped[datetime] = mapped_column(server_default=func.now())
async def consume(session, event):
res = await session.execute(
insert(InboxEvent)
.values(event_id=event.id)
.on_conflict_do_nothing()
)
if res.rowcount == 0:
return # duplicate
await process(session, event)
Event sourcing extensions
For full event sourcing (events are the source of truth, not just a side effect): consider eventsourcing package, or build on outbox with strong schema.
Logical replication (Postgres)
For consumer that needs every change without app emit:
ALTER SYSTEM SET wal_level = logical;
SELECT * FROM pg_create_logical_replication_slot('my_slot', 'pgoutput');
Consumer reads WAL via replication slot. Tools: Debezium → Kafka, pg_recvlogical.
App doesn’t need outbox; CDC captures changes. Tradeoff: tightly coupled to DB schema.
SKIP LOCKED queue (job table)
class Job(Base):
__tablename__ = "jobs"
id: Mapped[int] = mapped_column(primary_key=True)
queue: Mapped[str] = mapped_column(default="default")
payload: Mapped[dict] = mapped_column(JSONB)
priority: Mapped[int] = mapped_column(default=0)
run_at: Mapped[datetime] = mapped_column(server_default=func.now(), index=True)
locked_at: Mapped[datetime | None]
locked_by: Mapped[str | None]
attempts: Mapped[int] = mapped_column(default=0)
last_error: Mapped[str | None]
async def claim_one(session, worker_id, queue="default"):
async with session.begin():
job = await session.scalar(
select(Job)
.where(Job.queue == queue, Job.locked_at == None, Job.run_at <= func.now())
.order_by(Job.priority.desc(), Job.run_at)
.limit(1)
.with_for_update(skip_locked=True)
)
if job:
job.locked_at = datetime.utcnow()
job.locked_by = worker_id
job.attempts += 1
return job
Reaper for stuck jobs
async def reap(session, ttl_seconds=300):
await session.execute(
update(Job)
.where(Job.locked_at < datetime.utcnow() - timedelta(seconds=ttl_seconds))
.values(locked_at=None, locked_by=None)
)
await session.commit()
Retention / partitioning
Outbox / audit tables grow forever. Partition by month; drop old partitions.
See Postgres-focused cheatsheets .
Common mistakes
- Outbox publish without transaction with business write — lost messages.
- Worker without
SKIP LOCKED— contention. - No reaper — stuck rows accumulate.
- Forgetting idempotency at consumer — duplicate processing.
Read this next
If you want my outbox + LISTEN/NOTIFY + SKIP LOCKED worker template, it’s at rajpoot.dev .
Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .