Chapter 2: how SQLAlchemy talks to the database. Engine, pool, connections, events, and the patterns from production.

Engine

engine = create_engine("postgresql+psycopg://user:pass@host/db")

The engine is a factory + connection pool. Create once per process; reuse.

For async:

engine = create_async_engine("postgresql+asyncpg://user:pass@host/db")

Pool types

QueuePool (default for sync)

  • Persistent pool of connections.
  • pool_size: open / kept around.
  • max_overflow: extra above pool_size under load.
  • pool_timeout: wait this long for a free connection.
  • pool_recycle: recycle connections after N seconds.
  • pool_pre_ping: check liveness before use.
engine = create_engine(URL, pool_size=20, max_overflow=10, pool_timeout=30, pool_recycle=3600, pool_pre_ping=True)

NullPool (no pooling)

from sqlalchemy.pool import NullPool

engine = create_engine(URL, poolclass=NullPool)

Each connect() opens a fresh connection; closes it after.

For serverless (Lambda) where connections shouldn’t be reused across invocations.

StaticPool

Single shared connection. For SQLite in-memory tests:

from sqlalchemy.pool import StaticPool

engine = create_engine("sqlite://", poolclass=StaticPool, connect_args={"check_same_thread": False})

AsyncAdaptedQueuePool

Default for async engine; same semantics as QueuePool.

Sizing

For a FastAPI service:

total_connections = workers × replicas × (pool_size + max_overflow)

Postgres max_connections defaults to 100. Don’t blow past it.

Recommendation:

  • 4 workers × 5 replicas × (10 + 10) = 400 → too many for default Postgres.
  • Use PgBouncer in transaction-pool mode to multiplex.
  • Or smaller pool: 4 × 5 × (5 + 5) = 200 → still tight.

For most apps: PgBouncer is the answer beyond a certain scale.

pool_pre_ping

engine = create_engine(URL, pool_pre_ping=True)

Before using a pooled connection, run a cheap check. Catches connections killed by the DB / firewall.

Costs: one extra round trip per checkout. Worth it.

pool_recycle

engine = create_engine(URL, pool_recycle=3600)

Recycle connections older than N seconds. Avoids hitting the DB’s idle timeout (Postgres idle_session_timeout, MySQL wait_timeout).

Set to less than your DB’s timeout.

Events

from sqlalchemy import event

@event.listens_for(engine, "connect")
def on_connect(dbapi_conn, conn_record):
    # runs when a new physical connection is made
    pass

@event.listens_for(engine, "checkout")
def on_checkout(dbapi_conn, conn_record, conn_proxy):
    # runs every time a connection is borrowed from pool
    pass

@event.listens_for(engine, "before_execute")
def before_execute(conn, clauseelement, multiparams, params, execution_options):
    # before each statement
    pass

Hooks for connection lifecycle. Useful for setting session config, logging, metrics.

Per-connection settings

For Postgres set search_path or session config on every connection:

@event.listens_for(engine.sync_engine, "connect")
def set_role(dbapi_conn, conn_record):
    cursor = dbapi_conn.cursor()
    cursor.execute("SET app.tenant_id = ''")
    cursor.close()

For session-per-request multi-tenancy: see the Multi-Tenancy chapter .

Connect args

Pass driver-specific options:

engine = create_engine(
    URL,
    connect_args={
        "server_settings": {"application_name": "myapp"},
        "command_timeout": 60,
    },
)

asyncpg accepts server_settings, command_timeout, statement_cache_size, etc.

psycopg accepts different options. Check driver docs.

Connection.execute

with engine.connect() as conn:
    result = conn.execute(text("SELECT version()"))
    print(result.scalar())

Connection is checked out from pool. Use as context manager; returns to pool at the end.

For DDL / queries that need a transaction:

with engine.begin() as conn:
    conn.execute(text("CREATE TABLE ..."))
    conn.execute(insert(t).values(...))
# auto-commits if no exception

begin() opens a transaction; commits at the end.

Async equivalent

async with engine.connect() as conn:
    result = await conn.execute(text("SELECT version()"))

async with engine.begin() as conn:
    await conn.execute(...)

Same semantics; await everywhere.

Reflection

Read existing schema:

metadata = MetaData()
metadata.reflect(bind=engine)

users = metadata.tables["users"]

Useful for legacy DBs you don’t own. For most apps: define schema in Python; use Alembic for migrations.

Multiple engines

For read replicas:

write_engine = create_async_engine(WRITER_URL, ...)
read_engine = create_async_engine(READER_URL, ...)

WriteSession = async_sessionmaker(write_engine, ...)
ReadSession = async_sessionmaker(read_engine, ...)

async def get_write_db(): ...
async def get_read_db(): ...

Or use a single engine with read replica routing via dialect (more complex; rarely worth it).

Disposing

engine.dispose()  # close all pool connections

In FastAPI lifespan shutdown:

yield
await engine.dispose()

For pytest: dispose between sessions to avoid leaked connections.

Debugging

engine = create_engine(URL, echo=True)  # logs SQL
engine = create_engine(URL, echo="debug")  # logs SQL + results (verbose)
engine = create_engine(URL, echo_pool=True)  # logs pool events

For production: don’t echo (huge logs); use OpenTelemetry tracing.

Pool stats

print(engine.pool.status())
# Pool size: 20  Connections in pool: 15 Current Overflow: -5 Current Checked out connections: 5

For monitoring: expose as a Prometheus metric.

Common mistakes

1. Engine per request

async def get_db():
    engine = create_async_engine(URL)  # BAD: new engine per request
    ...

Engine should be process-singleton (created in lifespan). Sessions are per-request.

2. Forgetting dispose

Connections leak; tests flake. Lifespan shutdown disposes.

3. Pool too large

Postgres has a max. Don’t exceed it across all replicas / workers.

4. Pool too small

Threads waiting on pool checkout. Increase or use overflow.

5. Not using pool_pre_ping

Stale connections cause spurious errors after long-idle.

Cross-DB notes

  • Postgres: max_connections matters; PgBouncer for scale.
  • MySQL: similar; connection limit per host can be configured.
  • SQLite: file-based; one writer at a time; connection pool less relevant.

What’s next

Chapter 3: Schema and Types.

Read this next


Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .