Cheatsheet for emitting changes reliably to downstream consumers (search indexes, search caches, event buses).

Outbox pattern (atomic with business write)

class OutboxEvent(Base):
    __tablename__ = "outbox_events"
    id: Mapped[int] = mapped_column(primary_key=True)
    aggregate_type: Mapped[str]
    aggregate_id: Mapped[str]
    event_type: Mapped[str]
    payload: Mapped[dict] = mapped_column(JSONB)
    occurred_at: Mapped[datetime] = mapped_column(server_default=func.now())
    published_at: Mapped[datetime | None] = mapped_column(index=True, default=None)
    
    __table_args__ = (
        Index("ix_outbox_unpublished", "occurred_at", postgresql_where=text("published_at IS NULL")),
    )

Emit in the same transaction

async def create_post(session, author_id, title, body):
    post = Post(author_id=author_id, title=title, body=body)
    session.add(post)
    await session.flush()
    
    session.add(OutboxEvent(
        aggregate_type="Post",
        aggregate_id=str(post.id),
        event_type="post.created",
        payload={"id": post.id, "author_id": author_id, "title": title},
    ))
    await session.commit()
    return post

Either both rows commit or neither.

Worker drain (SKIP LOCKED)

async def drain_outbox():
    async with AsyncSessionLocal() as session:
        async with session.begin():
            events = (await session.execute(
                select(OutboxEvent)
                .where(OutboxEvent.published_at == None)
                .order_by(OutboxEvent.occurred_at)
                .limit(100)
                .with_for_update(skip_locked=True)
            )).scalars().all()
            
            for e in events:
                try:
                    await bus.publish(e.event_type, e.payload)
                    e.published_at = datetime.utcnow()
                except Exception as ex:
                    log.exception("publish_failed", event_id=e.id)
                    # don't update published_at; will retry next pass

SKIP LOCKED lets multiple workers run in parallel without conflict.

Loop with LISTEN/NOTIFY (low-latency wake)

NOTIFY_TRIGGER = """
CREATE OR REPLACE FUNCTION notify_outbox() RETURNS TRIGGER AS $$
BEGIN
    PERFORM pg_notify('outbox', '');
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER outbox_notify
AFTER INSERT ON outbox_events
FOR EACH ROW EXECUTE FUNCTION notify_outbox();
"""

# Worker
async def outbox_worker():
    conn = await asyncpg.connect(DATABASE_URL)
    await conn.add_listener("outbox", lambda *_: asyncio.create_task(drain_outbox()))
    while True:
        await asyncio.sleep(60)        # also drain periodically as safety

Inbox / dedup at consumer

class InboxEvent(Base):
    __tablename__ = "inbox_events"
    event_id: Mapped[str] = mapped_column(primary_key=True)
    received_at: Mapped[datetime] = mapped_column(server_default=func.now())

async def consume(session, event):
    res = await session.execute(
        insert(InboxEvent)
        .values(event_id=event.id)
        .on_conflict_do_nothing()
    )
    if res.rowcount == 0:
        return                            # duplicate
    await process(session, event)

Event sourcing extensions

For full event sourcing (events are the source of truth, not just a side effect): consider eventsourcing package, or build on outbox with strong schema.

Logical replication (Postgres)

For consumer that needs every change without app emit:

ALTER SYSTEM SET wal_level = logical;
SELECT * FROM pg_create_logical_replication_slot('my_slot', 'pgoutput');

Consumer reads WAL via replication slot. Tools: Debezium → Kafka, pg_recvlogical.

App doesn’t need outbox; CDC captures changes. Tradeoff: tightly coupled to DB schema.

SKIP LOCKED queue (job table)

class Job(Base):
    __tablename__ = "jobs"
    id: Mapped[int] = mapped_column(primary_key=True)
    queue: Mapped[str] = mapped_column(default="default")
    payload: Mapped[dict] = mapped_column(JSONB)
    priority: Mapped[int] = mapped_column(default=0)
    run_at: Mapped[datetime] = mapped_column(server_default=func.now(), index=True)
    locked_at: Mapped[datetime | None]
    locked_by: Mapped[str | None]
    attempts: Mapped[int] = mapped_column(default=0)
    last_error: Mapped[str | None]

async def claim_one(session, worker_id, queue="default"):
    async with session.begin():
        job = await session.scalar(
            select(Job)
            .where(Job.queue == queue, Job.locked_at == None, Job.run_at <= func.now())
            .order_by(Job.priority.desc(), Job.run_at)
            .limit(1)
            .with_for_update(skip_locked=True)
        )
        if job:
            job.locked_at = datetime.utcnow()
            job.locked_by = worker_id
            job.attempts += 1
        return job

Reaper for stuck jobs

async def reap(session, ttl_seconds=300):
    await session.execute(
        update(Job)
        .where(Job.locked_at < datetime.utcnow() - timedelta(seconds=ttl_seconds))
        .values(locked_at=None, locked_by=None)
    )
    await session.commit()

Retention / partitioning

Outbox / audit tables grow forever. Partition by month; drop old partitions.

See Postgres-focused cheatsheets .

Common mistakes

  • Outbox publish without transaction with business write — lost messages.
  • Worker without SKIP LOCKED — contention.
  • No reaper — stuck rows accumulate.
  • Forgetting idempotency at consumer — duplicate processing.

Read this next

If you want my outbox + LISTEN/NOTIFY + SKIP LOCKED worker template, it’s at rajpoot.dev .


Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .