Chapter 12, the final chapter of the DB-agnostic SQLAlchemy textbook: production patterns. Pool sizing, replicas, observability, multi-tenancy, failure modes.
Pool sizing with PgBouncer
For many workers / replicas:
[App pods (N)] → [PgBouncer] → [Postgres]
App connects to PgBouncer; PgBouncer multiplexes to Postgres.
engine = create_async_engine(
"postgresql+asyncpg://app@pgbouncer:6432/db",
pool_size=10,
max_overflow=5,
pool_pre_ping=True,
pool_recycle=300,
)
In transaction-pooling mode, prepared statement caches must be disabled:
connect_args={"prepared_statement_cache_size": 0, "statement_cache_size": 0}
For asyncpg.
Read replicas
Two engines:
write_engine = create_async_engine(WRITER_URL, pool_size=20)
read_engine = create_async_engine(READER_URL, pool_size=40)
WriteSession = async_sessionmaker(write_engine, expire_on_commit=False)
ReadSession = async_sessionmaker(read_engine, expire_on_commit=False)
Per-request:
async def get_write_db() -> AsyncSession:
async with WriteSession() as session:
yield session
async def get_read_db() -> AsyncSession:
async with ReadSession() as session:
yield session
@app.get("/users/{id}")
async def get_user(id: int, db: AsyncSession = Depends(get_read_db)):
return await db.get(User, id)
@app.post("/users")
async def create_user(data: UserCreate, db: AsyncSession = Depends(get_write_db)):
user = User(**data.model_dump())
db.add(user)
await db.commit()
Read-after-write: writer’s reads should hit primary, not replica. See Postgres Replication .
OTEL tracing
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
SQLAlchemyInstrumentor().instrument(engine=engine.sync_engine)
Every query becomes a trace span: SQL, duration, errors. See Distributed Tracing .
For asyncpg / async engines: instrument the underlying sync_engine.
Slow query logging
@event.listens_for(engine.sync_engine, "before_cursor_execute")
def before(conn, cursor, statement, parameters, context, executemany):
context._t = time.time()
@event.listens_for(engine.sync_engine, "after_cursor_execute")
def after(conn, cursor, statement, parameters, context, executemany):
duration = time.time() - context._t
if duration > 0.5:
log.warning("slow_query", sql=statement[:500], duration_ms=duration * 1000)
Log slow queries; investigate.
Pool metrics
from prometheus_client import Gauge
pool_size = Gauge("db_pool_size", "Pool size")
pool_in_use = Gauge("db_pool_checked_out", "Checked out connections")
async def report_pool():
while True:
pool_size.set(engine.pool.size())
pool_in_use.set(engine.pool.checked_out())
await asyncio.sleep(10)
Pool metrics in Prometheus. Alert on pool exhaustion.
Multi-tenancy
For tenant_id-per-row with RLS:
@event.listens_for(engine.sync_engine, "before_cursor_execute")
def set_tenant(conn, cursor, statement, parameters, context, executemany):
tenant_id = current_tenant_id_var.get()
if tenant_id:
cursor.execute(f"SET LOCAL app.tenant_id = '{tenant_id}'")
Per-request: bind contextvar; events apply at query time.
For schema-per-tenant: rotate search_path. For DB-per-tenant: separate engines per tenant.
See Multi-Tenancy Patterns .
Graceful failure
from sqlalchemy.exc import OperationalError, DisconnectionError
async def with_retry(fn, max_attempts=3):
for attempt in range(max_attempts):
try:
return await fn()
except (OperationalError, DisconnectionError) as e:
if attempt == max_attempts - 1:
raise
await asyncio.sleep(0.1 * (2 ** attempt))
For network blips. Don’t retry constraint violations.
Health and readiness
@app.get("/healthz")
async def health():
return {"status": "ok"}
@app.get("/ready")
async def ready(db: AsyncSession = Depends(get_db)):
try:
await db.execute(text("SELECT 1"))
except Exception:
return JSONResponse({"status": "not ready"}, status_code=503)
return {"status": "ready"}
Liveness: just up. Readiness: DB reachable.
DB connection limits
Postgres max_connections = 100 (default). Across all your services:
total = sum over services of (replicas × workers × (pool_size + max_overflow))
≤ max_connections
Or use PgBouncer to multiplex.
Lifespan
@asynccontextmanager
async def lifespan(app):
engine = create_async_engine(DATABASE_URL, ...)
async_sessionmaker_obj = async_sessionmaker(engine, expire_on_commit=False)
app.state.engine = engine
app.state.sessionmaker = async_sessionmaker_obj
yield
await engine.dispose()
Engine: lifespan-scoped. Sessions: per-request via Depends.
Migrations on deploy
K8s Job runs Alembic before app rolls out:
apiVersion: batch/v1
kind: Job
metadata: { name: migrate-{{ .Values.image.tag }} }
spec:
template:
spec:
containers:
- name: migrate
image: myapp:{{ .Values.image.tag }}
command: ["alembic", "upgrade", "head"]
restartPolicy: OnFailure
App pods don’t run migrations themselves; Job does once.
See Database Migrations and Alembic textbook .
Backups
Not SQLAlchemy’s domain, but: critical. PG: pg_dump + WAL archiving (or pgBackRest, Barman). MySQL: mysqldump or Percona XtraBackup. SQLite: file copy with WAL care.
Test restores quarterly.
Disaster recovery
- Cross-region replica.
- Periodic snapshot copy to other region.
- Documented restore procedure with RTO / RPO.
The thing to actually test, not just plan.
Operational checklist
- PgBouncer (or equivalent) in front.
- Pool sizing reviewed across all replicas.
- Pool metrics in Prometheus.
- Slow query log + alerts.
- OTEL tracing enabled.
- Read replicas for read-heavy paths.
- Migrations as K8s Job.
- Backups tested.
- DR plan documented.
- expire_on_commit=False set.
- Eager loading audited (no N+1).
- Indexes match query patterns.
Common mistakes
1. Pool exhaustion
Workers all waiting on a slow query. Slow query log + pool metrics.
2. Long-held sessions
Per-request session that does external HTTP mid-flight. Acquire late, release early.
3. Missing pool_pre_ping
Connections die after idle timeout; spurious errors. Always pre-ping.
4. ORM in hot read path
10k QPS endpoint using ORM with relationships. Profile; switch hot paths to Core.
5. Schema drift
Model changes without migration. Drift test in CI.
You’ve finished
That’s the DB-agnostic SQLAlchemy textbook. For Postgres-specific deep dive: the companion volume .
For migrations: the Alembic textbook .
For Pydantic: the Pydantic v2 textbook .
For FastAPI: the FastAPI textbook .
If you want my full SQLAlchemy + FastAPI production starter (this whole stack wired up), it’s at rajpoot.dev .
Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .