Cheatsheet for Postgres pub/sub via LISTEN/NOTIFY.

Basic NOTIFY

async with session.begin():
    session.add(user)
    await session.execute(text("NOTIFY user_events, :p"), {"p": json.dumps({"id": user.id})})

NOTIFY delivers on transaction commit. Rollback = nothing sent.

Listen via asyncpg (dedicated connection)

import asyncpg

async def listen():
    conn = await asyncpg.connect(DATABASE_URL)
    await conn.add_listener("user_events", on_notify)
    while True:
        await asyncio.sleep(60)        # keep connection alive

async def on_notify(connection, pid, channel, payload):
    data = json.loads(payload)
    log.info("got_event", channel=channel, data=data)

Listen connection is separate from the SQLAlchemy pool. Long-lived.

Channels (namespaced)

# Per-tenant
await conn.execute(f"LISTEN tenant_{tenant_id}")

# By type
await conn.execute("LISTEN orders")
await conn.execute("LISTEN invoices")

Channel name: any string. Postgres handles thousands fine.

Outbox + LISTEN wake-up

Trigger emits a notify on insert:

CREATE OR REPLACE FUNCTION notify_outbox() RETURNS TRIGGER AS $$
BEGIN
    PERFORM pg_notify('outbox', '');
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER outbox_notify
AFTER INSERT ON outbox_events
FOR EACH ROW EXECUTE FUNCTION notify_outbox();

Worker:

async def outbox_worker():
    conn = await asyncpg.connect(DATABASE_URL)
    await conn.add_listener("outbox", lambda *_: asyncio.create_task(drain()))
    while True:
        await drain()                  # periodic safety drain
        await asyncio.sleep(60)

async def drain():
    async with AsyncSessionLocal() as s:
        async with s.begin():
            events = (await s.execute(
                select(OutboxEvent)
                .where(OutboxEvent.published_at == None)
                .limit(100)
                .with_for_update(skip_locked=True)
            )).scalars().all()
            for e in events:
                await bus.publish(e.event_type, e.payload)
                e.published_at = datetime.utcnow()

Sub-millisecond wake-up; safety periodic drain.

Payload limits

8000 bytes per NOTIFY by default. For larger: send IDs only.

await session.execute(text("NOTIFY user_events, :id"), {"id": str(user.id)})

Consumer fetches from DB.

NOTIFY without payload (just a signal)

NOTIFY my_channel;
await conn.execute("NOTIFY my_channel")

Useful when “something changed; go check” suffices.

Auto-NOTIFY via trigger

CREATE OR REPLACE FUNCTION notify_user_change() RETURNS TRIGGER AS $$
BEGIN
    PERFORM pg_notify('user_changes', json_build_object('op', TG_OP, 'id', NEW.id)::text);
    RETURN NEW;
END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER user_change_trigger
AFTER INSERT OR UPDATE OR DELETE ON users
FOR EACH ROW EXECUTE FUNCTION notify_user_change();

App code doesn’t need to NOTIFY; DB does. Subscribers get every change.

Cleanup on shutdown

@asynccontextmanager
async def lifespan(app):
    listen_task = asyncio.create_task(listen())
    yield
    listen_task.cancel()
    try: await listen_task
    except asyncio.CancelledError: pass

Connection keepalive

Long-idle LISTEN connections die behind firewalls. Send a no-op:

async def keepalive():
    while True:
        await conn.execute("SELECT 1")
        await asyncio.sleep(60)

asyncpg has keepalives setting too:

conn = await asyncpg.connect(
    DATABASE_URL,
    server_settings={"tcp_keepalives_idle": "60"},
)

Compared to Redis Pub/Sub

LISTEN/NOTIFYRedis Pub/SubKafka
SetupAlready thereAdd RedisAdd cluster
LatencySub-msSub-msms
ThroughputThousands/s100k+/sMillions/s
PersistenceNoneNoneDurable
ReplayNoneNoneYes

For low-latency wake-up of background workers: LISTEN/NOTIFY is ideal. For durable streams: Kafka.

Reliability

NOTIFY is best-effort:

  • Disconnected subscriber misses events while away.
  • No replay.

For at-least-once: outbox table + polling + LISTEN as wake-up signal.

Common mistakes

  • Listening on the SQLAlchemy pool — connections rotate; LISTEN lost.
  • Big payloads — exceeds 8KB limit; use IDs.
  • No keepalive — connections die behind NAT.
  • Treating NOTIFY as durable — paired with persistent log.

Read this next

If you want my outbox + LISTEN drain template, it’s at rajpoot.dev .


Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .