Cheatsheet for production. Long-form: Textbook Ch 12 .
PgBouncer setup
engine = create_async_engine(
"postgresql+asyncpg://app@pgbouncer:6432/db",
pool_size=10,
max_overflow=10,
pool_pre_ping=True,
pool_recycle=300,
connect_args={
"statement_cache_size": 0, # transaction-pool requires this
"prepared_statement_cache_size": 0,
"server_settings": {"jit": "off"},
},
)
Read replicas
write_engine = create_async_engine(WRITER_URL, pool_size=20)
read_engine = create_async_engine(
READER_URL,
pool_size=40,
connect_args={"server_settings": {"default_transaction_read_only": "on"}},
)
WriteSession = async_sessionmaker(write_engine, expire_on_commit=False)
ReadSession = async_sessionmaker(read_engine, expire_on_commit=False)
# Routing via Depends
async def get_write_db():
async with WriteSession() as s: yield s
async def get_read_db():
async with ReadSession() as s: yield s
Read-after-write
For a user who just wrote, route their reads to primary for ~5s:
RECENT_WRITER_TTL = 5
async def get_db(request: Request):
user_id = request.state.user.id if hasattr(request.state, "user") else None
if user_id and await redis.exists(f"recent_writer:{user_id}"):
SessionLocal = WriteSession
else:
SessionLocal = ReadSession
async with SessionLocal() as s:
yield s
# When a write happens
await redis.set(f"recent_writer:{user_id}", "1", ex=RECENT_WRITER_TTL)
OTEL instrumentation
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
SQLAlchemyInstrumentor().instrument(engine=engine.sync_engine)
Every query becomes a trace span.
Slow query log
@event.listens_for(engine.sync_engine, "before_cursor_execute")
def before(conn, cur, stmt, params, ctx, executemany):
ctx._t = time.time()
@event.listens_for(engine.sync_engine, "after_cursor_execute")
def after(conn, cur, stmt, params, ctx, executemany):
dur = time.time() - ctx._t
if dur > 0.5:
log.warning("slow_query", sql=stmt[:500], ms=dur*1000)
Pool metrics → Prometheus
from prometheus_client import Gauge
pool_size_g = Gauge("db_pool_size", "")
pool_in_use_g = Gauge("db_pool_checked_out", "")
async def report():
while True:
pool_size_g.set(engine.pool.size())
pool_in_use_g.set(engine.pool.checked_out())
await asyncio.sleep(10)
Multi-tenancy (Postgres RLS)
@event.listens_for(engine.sync_engine, "before_cursor_execute")
def set_tenant(conn, cur, stmt, params, ctx, executemany):
tid = current_tenant_id_var.get()
if tid:
cur.execute(f"SET LOCAL app.tenant_id = '{tid}'")
# In FastAPI middleware
@app.middleware("http")
async def tenant_mw(request, call_next):
tid = await resolve_tenant(request)
token = current_tenant_id_var.set(tid)
try:
return await call_next(request)
finally:
current_tenant_id_var.reset(token)
See Postgres-focused cheatsheets for full RLS policies.
Graceful failure / retry
from sqlalchemy.exc import OperationalError, DisconnectionError
async def with_retry(fn, max_attempts=3):
for attempt in range(max_attempts):
try:
return await fn()
except (OperationalError, DisconnectionError) as e:
if attempt == max_attempts - 1: raise
await asyncio.sleep(0.1 * (2 ** attempt))
Don’t retry constraint violations.
Health / readiness
@app.get("/healthz")
async def healthz(): return {"status": "ok"}
@app.get("/ready")
async def ready(db = Depends(get_db)):
try: await db.execute(text("SELECT 1"))
except Exception:
return JSONResponse({"status": "not ready"}, status_code=503)
return {"status": "ready"}
Lifespan integration
@asynccontextmanager
async def lifespan(app):
engine = create_async_engine(URL, ...)
app.state.engine = engine
app.state.sm = async_sessionmaker(engine, expire_on_commit=False)
yield
await engine.dispose()
Failover
# Multi-host asyncpg URL
"postgresql+asyncpg://app@primary:5432,replica:5432/db?target_session_attrs=read-write"
Connects to whichever is writable; handles failover.
Connection limits formula
total_app_connections = workers × replicas × (pool_size + max_overflow)
≤ db_max_connections
(or behind PgBouncer that multiplexes)
Operational checklist
- PgBouncer in front (or sized small directly).
- Pool stats exported.
- Slow query log + alerts.
- OTEL spans on queries.
- Read replicas for read-heavy paths.
- Migrations as K8s Job (not in-app at startup with many replicas).
- Backups + PITR tested.
- expire_on_commit=False everywhere.
- N+1 audited (selectinload).
- Drift test in CI.
Read this next
If you want my full production engine + tenant context + observability template, it’s at rajpoot.dev .
Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .