Chapter 12, the final chapter of the DB-agnostic SQLAlchemy textbook: production patterns. Pool sizing, replicas, observability, multi-tenancy, failure modes.

Pool sizing with PgBouncer

For many workers / replicas:

[App pods (N)] → [PgBouncer] → [Postgres]

App connects to PgBouncer; PgBouncer multiplexes to Postgres.

engine = create_async_engine(
    "postgresql+asyncpg://app@pgbouncer:6432/db",
    pool_size=10,
    max_overflow=5,
    pool_pre_ping=True,
    pool_recycle=300,
)

In transaction-pooling mode, prepared statement caches must be disabled:

connect_args={"prepared_statement_cache_size": 0, "statement_cache_size": 0}

For asyncpg.

Read replicas

Two engines:

write_engine = create_async_engine(WRITER_URL, pool_size=20)
read_engine = create_async_engine(READER_URL, pool_size=40)

WriteSession = async_sessionmaker(write_engine, expire_on_commit=False)
ReadSession = async_sessionmaker(read_engine, expire_on_commit=False)

Per-request:

async def get_write_db() -> AsyncSession:
    async with WriteSession() as session:
        yield session

async def get_read_db() -> AsyncSession:
    async with ReadSession() as session:
        yield session

@app.get("/users/{id}")
async def get_user(id: int, db: AsyncSession = Depends(get_read_db)):
    return await db.get(User, id)

@app.post("/users")
async def create_user(data: UserCreate, db: AsyncSession = Depends(get_write_db)):
    user = User(**data.model_dump())
    db.add(user)
    await db.commit()

Read-after-write: writer’s reads should hit primary, not replica. See Postgres Replication .

OTEL tracing

from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor

SQLAlchemyInstrumentor().instrument(engine=engine.sync_engine)

Every query becomes a trace span: SQL, duration, errors. See Distributed Tracing .

For asyncpg / async engines: instrument the underlying sync_engine.

Slow query logging

@event.listens_for(engine.sync_engine, "before_cursor_execute")
def before(conn, cursor, statement, parameters, context, executemany):
    context._t = time.time()

@event.listens_for(engine.sync_engine, "after_cursor_execute")
def after(conn, cursor, statement, parameters, context, executemany):
    duration = time.time() - context._t
    if duration > 0.5:
        log.warning("slow_query", sql=statement[:500], duration_ms=duration * 1000)

Log slow queries; investigate.

Pool metrics

from prometheus_client import Gauge

pool_size = Gauge("db_pool_size", "Pool size")
pool_in_use = Gauge("db_pool_checked_out", "Checked out connections")

async def report_pool():
    while True:
        pool_size.set(engine.pool.size())
        pool_in_use.set(engine.pool.checked_out())
        await asyncio.sleep(10)

Pool metrics in Prometheus. Alert on pool exhaustion.

Multi-tenancy

For tenant_id-per-row with RLS:

@event.listens_for(engine.sync_engine, "before_cursor_execute")
def set_tenant(conn, cursor, statement, parameters, context, executemany):
    tenant_id = current_tenant_id_var.get()
    if tenant_id:
        cursor.execute(f"SET LOCAL app.tenant_id = '{tenant_id}'")

Per-request: bind contextvar; events apply at query time.

For schema-per-tenant: rotate search_path. For DB-per-tenant: separate engines per tenant.

See Multi-Tenancy Patterns .

Graceful failure

from sqlalchemy.exc import OperationalError, DisconnectionError

async def with_retry(fn, max_attempts=3):
    for attempt in range(max_attempts):
        try:
            return await fn()
        except (OperationalError, DisconnectionError) as e:
            if attempt == max_attempts - 1:
                raise
            await asyncio.sleep(0.1 * (2 ** attempt))

For network blips. Don’t retry constraint violations.

Health and readiness

@app.get("/healthz")
async def health():
    return {"status": "ok"}

@app.get("/ready")
async def ready(db: AsyncSession = Depends(get_db)):
    try:
        await db.execute(text("SELECT 1"))
    except Exception:
        return JSONResponse({"status": "not ready"}, status_code=503)
    return {"status": "ready"}

Liveness: just up. Readiness: DB reachable.

DB connection limits

Postgres max_connections = 100 (default). Across all your services:

total = sum over services of (replicas × workers × (pool_size + max_overflow))
       ≤ max_connections

Or use PgBouncer to multiplex.

Lifespan

@asynccontextmanager
async def lifespan(app):
    engine = create_async_engine(DATABASE_URL, ...)
    async_sessionmaker_obj = async_sessionmaker(engine, expire_on_commit=False)
    
    app.state.engine = engine
    app.state.sessionmaker = async_sessionmaker_obj
    
    yield
    
    await engine.dispose()

Engine: lifespan-scoped. Sessions: per-request via Depends.

Migrations on deploy

K8s Job runs Alembic before app rolls out:

apiVersion: batch/v1
kind: Job
metadata: { name: migrate-{{ .Values.image.tag }} }
spec:
  template:
    spec:
      containers:
        - name: migrate
          image: myapp:{{ .Values.image.tag }}
          command: ["alembic", "upgrade", "head"]
      restartPolicy: OnFailure

App pods don’t run migrations themselves; Job does once.

See Database Migrations and Alembic textbook .

Backups

Not SQLAlchemy’s domain, but: critical. PG: pg_dump + WAL archiving (or pgBackRest, Barman). MySQL: mysqldump or Percona XtraBackup. SQLite: file copy with WAL care.

Test restores quarterly.

Disaster recovery

  • Cross-region replica.
  • Periodic snapshot copy to other region.
  • Documented restore procedure with RTO / RPO.

The thing to actually test, not just plan.

Operational checklist

  • PgBouncer (or equivalent) in front.
  • Pool sizing reviewed across all replicas.
  • Pool metrics in Prometheus.
  • Slow query log + alerts.
  • OTEL tracing enabled.
  • Read replicas for read-heavy paths.
  • Migrations as K8s Job.
  • Backups tested.
  • DR plan documented.
  • expire_on_commit=False set.
  • Eager loading audited (no N+1).
  • Indexes match query patterns.

Common mistakes

1. Pool exhaustion

Workers all waiting on a slow query. Slow query log + pool metrics.

2. Long-held sessions

Per-request session that does external HTTP mid-flight. Acquire late, release early.

3. Missing pool_pre_ping

Connections die after idle timeout; spurious errors. Always pre-ping.

4. ORM in hot read path

10k QPS endpoint using ORM with relationships. Profile; switch hot paths to Core.

5. Schema drift

Model changes without migration. Drift test in CI.

You’ve finished

That’s the DB-agnostic SQLAlchemy textbook. For Postgres-specific deep dive: the companion volume .

For migrations: the Alembic textbook .

For Pydantic: the Pydantic v2 textbook .

For FastAPI: the FastAPI textbook .

If you want my full SQLAlchemy + FastAPI production starter (this whole stack wired up), it’s at rajpoot.dev .


Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .