Cheatsheet for Postgres pub/sub via LISTEN/NOTIFY.
Basic NOTIFY
async with session.begin():
session.add(user)
await session.execute(text("NOTIFY user_events, :p"), {"p": json.dumps({"id": user.id})})
NOTIFY delivers on transaction commit. Rollback = nothing sent.
Listen via asyncpg (dedicated connection)
import asyncpg
async def listen():
conn = await asyncpg.connect(DATABASE_URL)
await conn.add_listener("user_events", on_notify)
while True:
await asyncio.sleep(60) # keep connection alive
async def on_notify(connection, pid, channel, payload):
data = json.loads(payload)
log.info("got_event", channel=channel, data=data)
Listen connection is separate from the SQLAlchemy pool. Long-lived.
Channels (namespaced)
# Per-tenant
await conn.execute(f"LISTEN tenant_{tenant_id}")
# By type
await conn.execute("LISTEN orders")
await conn.execute("LISTEN invoices")
Channel name: any string. Postgres handles thousands fine.
Outbox + LISTEN wake-up
Trigger emits a notify on insert:
CREATE OR REPLACE FUNCTION notify_outbox() RETURNS TRIGGER AS $$
BEGIN
PERFORM pg_notify('outbox', '');
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER outbox_notify
AFTER INSERT ON outbox_events
FOR EACH ROW EXECUTE FUNCTION notify_outbox();
Worker:
async def outbox_worker():
conn = await asyncpg.connect(DATABASE_URL)
await conn.add_listener("outbox", lambda *_: asyncio.create_task(drain()))
while True:
await drain() # periodic safety drain
await asyncio.sleep(60)
async def drain():
async with AsyncSessionLocal() as s:
async with s.begin():
events = (await s.execute(
select(OutboxEvent)
.where(OutboxEvent.published_at == None)
.limit(100)
.with_for_update(skip_locked=True)
)).scalars().all()
for e in events:
await bus.publish(e.event_type, e.payload)
e.published_at = datetime.utcnow()
Sub-millisecond wake-up; safety periodic drain.
Payload limits
8000 bytes per NOTIFY by default. For larger: send IDs only.
await session.execute(text("NOTIFY user_events, :id"), {"id": str(user.id)})
Consumer fetches from DB.
NOTIFY without payload (just a signal)
NOTIFY my_channel;
await conn.execute("NOTIFY my_channel")
Useful when “something changed; go check” suffices.
Auto-NOTIFY via trigger
CREATE OR REPLACE FUNCTION notify_user_change() RETURNS TRIGGER AS $$
BEGIN
PERFORM pg_notify('user_changes', json_build_object('op', TG_OP, 'id', NEW.id)::text);
RETURN NEW;
END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER user_change_trigger
AFTER INSERT OR UPDATE OR DELETE ON users
FOR EACH ROW EXECUTE FUNCTION notify_user_change();
App code doesn’t need to NOTIFY; DB does. Subscribers get every change.
Cleanup on shutdown
@asynccontextmanager
async def lifespan(app):
listen_task = asyncio.create_task(listen())
yield
listen_task.cancel()
try: await listen_task
except asyncio.CancelledError: pass
Connection keepalive
Long-idle LISTEN connections die behind firewalls. Send a no-op:
async def keepalive():
while True:
await conn.execute("SELECT 1")
await asyncio.sleep(60)
asyncpg has keepalives setting too:
conn = await asyncpg.connect(
DATABASE_URL,
server_settings={"tcp_keepalives_idle": "60"},
)
Compared to Redis Pub/Sub
| LISTEN/NOTIFY | Redis Pub/Sub | Kafka | |
|---|---|---|---|
| Setup | Already there | Add Redis | Add cluster |
| Latency | Sub-ms | Sub-ms | ms |
| Throughput | Thousands/s | 100k+/s | Millions/s |
| Persistence | None | None | Durable |
| Replay | None | None | Yes |
For low-latency wake-up of background workers: LISTEN/NOTIFY is ideal. For durable streams: Kafka.
Reliability
NOTIFY is best-effort:
- Disconnected subscriber misses events while away.
- No replay.
For at-least-once: outbox table + polling + LISTEN as wake-up signal.
Common mistakes
- Listening on the SQLAlchemy pool — connections rotate; LISTEN lost.
- Big payloads — exceeds 8KB limit; use IDs.
- No keepalive — connections die behind NAT.
- Treating NOTIFY as durable — paired with persistent log.
Read this next
If you want my outbox + LISTEN drain template, it’s at rajpoot.dev .
Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .