Cheatsheet for production. Long-form: Textbook Ch 12 .

PgBouncer setup

engine = create_async_engine(
    "postgresql+asyncpg://app@pgbouncer:6432/db",
    pool_size=10,
    max_overflow=10,
    pool_pre_ping=True,
    pool_recycle=300,
    connect_args={
        "statement_cache_size": 0,             # transaction-pool requires this
        "prepared_statement_cache_size": 0,
        "server_settings": {"jit": "off"},
    },
)

Read replicas

write_engine = create_async_engine(WRITER_URL, pool_size=20)
read_engine = create_async_engine(
    READER_URL,
    pool_size=40,
    connect_args={"server_settings": {"default_transaction_read_only": "on"}},
)

WriteSession = async_sessionmaker(write_engine, expire_on_commit=False)
ReadSession = async_sessionmaker(read_engine, expire_on_commit=False)

# Routing via Depends
async def get_write_db(): 
    async with WriteSession() as s: yield s

async def get_read_db():
    async with ReadSession() as s: yield s

Read-after-write

For a user who just wrote, route their reads to primary for ~5s:

RECENT_WRITER_TTL = 5

async def get_db(request: Request):
    user_id = request.state.user.id if hasattr(request.state, "user") else None
    if user_id and await redis.exists(f"recent_writer:{user_id}"):
        SessionLocal = WriteSession
    else:
        SessionLocal = ReadSession
    async with SessionLocal() as s:
        yield s

# When a write happens
await redis.set(f"recent_writer:{user_id}", "1", ex=RECENT_WRITER_TTL)

OTEL instrumentation

from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor

SQLAlchemyInstrumentor().instrument(engine=engine.sync_engine)

Every query becomes a trace span.

Slow query log

@event.listens_for(engine.sync_engine, "before_cursor_execute")
def before(conn, cur, stmt, params, ctx, executemany):
    ctx._t = time.time()

@event.listens_for(engine.sync_engine, "after_cursor_execute")
def after(conn, cur, stmt, params, ctx, executemany):
    dur = time.time() - ctx._t
    if dur > 0.5:
        log.warning("slow_query", sql=stmt[:500], ms=dur*1000)

Pool metrics → Prometheus

from prometheus_client import Gauge

pool_size_g = Gauge("db_pool_size", "")
pool_in_use_g = Gauge("db_pool_checked_out", "")

async def report():
    while True:
        pool_size_g.set(engine.pool.size())
        pool_in_use_g.set(engine.pool.checked_out())
        await asyncio.sleep(10)

Multi-tenancy (Postgres RLS)

@event.listens_for(engine.sync_engine, "before_cursor_execute")
def set_tenant(conn, cur, stmt, params, ctx, executemany):
    tid = current_tenant_id_var.get()
    if tid:
        cur.execute(f"SET LOCAL app.tenant_id = '{tid}'")

# In FastAPI middleware
@app.middleware("http")
async def tenant_mw(request, call_next):
    tid = await resolve_tenant(request)
    token = current_tenant_id_var.set(tid)
    try:
        return await call_next(request)
    finally:
        current_tenant_id_var.reset(token)

See Postgres-focused cheatsheets for full RLS policies.

Graceful failure / retry

from sqlalchemy.exc import OperationalError, DisconnectionError

async def with_retry(fn, max_attempts=3):
    for attempt in range(max_attempts):
        try:
            return await fn()
        except (OperationalError, DisconnectionError) as e:
            if attempt == max_attempts - 1: raise
            await asyncio.sleep(0.1 * (2 ** attempt))

Don’t retry constraint violations.

Health / readiness

@app.get("/healthz")
async def healthz(): return {"status": "ok"}

@app.get("/ready")
async def ready(db = Depends(get_db)):
    try: await db.execute(text("SELECT 1"))
    except Exception:
        return JSONResponse({"status": "not ready"}, status_code=503)
    return {"status": "ready"}

Lifespan integration

@asynccontextmanager
async def lifespan(app):
    engine = create_async_engine(URL, ...)
    app.state.engine = engine
    app.state.sm = async_sessionmaker(engine, expire_on_commit=False)
    yield
    await engine.dispose()

Failover

# Multi-host asyncpg URL
"postgresql+asyncpg://app@primary:5432,replica:5432/db?target_session_attrs=read-write"

Connects to whichever is writable; handles failover.

Connection limits formula

total_app_connections = workers × replicas × (pool_size + max_overflow)
≤ db_max_connections
(or behind PgBouncer that multiplexes)

Operational checklist

  • PgBouncer in front (or sized small directly).
  • Pool stats exported.
  • Slow query log + alerts.
  • OTEL spans on queries.
  • Read replicas for read-heavy paths.
  • Migrations as K8s Job (not in-app at startup with many replicas).
  • Backups + PITR tested.
  • expire_on_commit=False everywhere.
  • N+1 audited (selectinload).
  • Drift test in CI.

Read this next

If you want my full production engine + tenant context + observability template, it’s at rajpoot.dev .


Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .