Chapter 2: how SQLAlchemy talks to the database. Engine, pool, connections, events, and the patterns from production.
Engine
engine = create_engine("postgresql+psycopg://user:pass@host/db")
The engine is a factory + connection pool. Create once per process; reuse.
For async:
engine = create_async_engine("postgresql+asyncpg://user:pass@host/db")
Pool types
QueuePool (default for sync)
- Persistent pool of connections.
pool_size: open / kept around.max_overflow: extra above pool_size under load.pool_timeout: wait this long for a free connection.pool_recycle: recycle connections after N seconds.pool_pre_ping: check liveness before use.
engine = create_engine(URL, pool_size=20, max_overflow=10, pool_timeout=30, pool_recycle=3600, pool_pre_ping=True)
NullPool (no pooling)
from sqlalchemy.pool import NullPool
engine = create_engine(URL, poolclass=NullPool)
Each connect() opens a fresh connection; closes it after.
For serverless (Lambda) where connections shouldn’t be reused across invocations.
StaticPool
Single shared connection. For SQLite in-memory tests:
from sqlalchemy.pool import StaticPool
engine = create_engine("sqlite://", poolclass=StaticPool, connect_args={"check_same_thread": False})
AsyncAdaptedQueuePool
Default for async engine; same semantics as QueuePool.
Sizing
For a FastAPI service:
total_connections = workers × replicas × (pool_size + max_overflow)
Postgres max_connections defaults to 100. Don’t blow past it.
Recommendation:
- 4 workers × 5 replicas × (10 + 10) = 400 → too many for default Postgres.
- Use PgBouncer in transaction-pool mode to multiplex.
- Or smaller pool: 4 × 5 × (5 + 5) = 200 → still tight.
For most apps: PgBouncer is the answer beyond a certain scale.
pool_pre_ping
engine = create_engine(URL, pool_pre_ping=True)
Before using a pooled connection, run a cheap check. Catches connections killed by the DB / firewall.
Costs: one extra round trip per checkout. Worth it.
pool_recycle
engine = create_engine(URL, pool_recycle=3600)
Recycle connections older than N seconds. Avoids hitting the DB’s idle timeout (Postgres idle_session_timeout, MySQL wait_timeout).
Set to less than your DB’s timeout.
Events
from sqlalchemy import event
@event.listens_for(engine, "connect")
def on_connect(dbapi_conn, conn_record):
# runs when a new physical connection is made
pass
@event.listens_for(engine, "checkout")
def on_checkout(dbapi_conn, conn_record, conn_proxy):
# runs every time a connection is borrowed from pool
pass
@event.listens_for(engine, "before_execute")
def before_execute(conn, clauseelement, multiparams, params, execution_options):
# before each statement
pass
Hooks for connection lifecycle. Useful for setting session config, logging, metrics.
Per-connection settings
For Postgres set search_path or session config on every connection:
@event.listens_for(engine.sync_engine, "connect")
def set_role(dbapi_conn, conn_record):
cursor = dbapi_conn.cursor()
cursor.execute("SET app.tenant_id = ''")
cursor.close()
For session-per-request multi-tenancy: see the Multi-Tenancy chapter .
Connect args
Pass driver-specific options:
engine = create_engine(
URL,
connect_args={
"server_settings": {"application_name": "myapp"},
"command_timeout": 60,
},
)
asyncpg accepts server_settings, command_timeout, statement_cache_size, etc.
psycopg accepts different options. Check driver docs.
Connection.execute
with engine.connect() as conn:
result = conn.execute(text("SELECT version()"))
print(result.scalar())
Connection is checked out from pool. Use as context manager; returns to pool at the end.
For DDL / queries that need a transaction:
with engine.begin() as conn:
conn.execute(text("CREATE TABLE ..."))
conn.execute(insert(t).values(...))
# auto-commits if no exception
begin() opens a transaction; commits at the end.
Async equivalent
async with engine.connect() as conn:
result = await conn.execute(text("SELECT version()"))
async with engine.begin() as conn:
await conn.execute(...)
Same semantics; await everywhere.
Reflection
Read existing schema:
metadata = MetaData()
metadata.reflect(bind=engine)
users = metadata.tables["users"]
Useful for legacy DBs you don’t own. For most apps: define schema in Python; use Alembic for migrations.
Multiple engines
For read replicas:
write_engine = create_async_engine(WRITER_URL, ...)
read_engine = create_async_engine(READER_URL, ...)
WriteSession = async_sessionmaker(write_engine, ...)
ReadSession = async_sessionmaker(read_engine, ...)
async def get_write_db(): ...
async def get_read_db(): ...
Or use a single engine with read replica routing via dialect (more complex; rarely worth it).
Disposing
engine.dispose() # close all pool connections
In FastAPI lifespan shutdown:
yield
await engine.dispose()
For pytest: dispose between sessions to avoid leaked connections.
Debugging
engine = create_engine(URL, echo=True) # logs SQL
engine = create_engine(URL, echo="debug") # logs SQL + results (verbose)
engine = create_engine(URL, echo_pool=True) # logs pool events
For production: don’t echo (huge logs); use OpenTelemetry tracing.
Pool stats
print(engine.pool.status())
# Pool size: 20 Connections in pool: 15 Current Overflow: -5 Current Checked out connections: 5
For monitoring: expose as a Prometheus metric.
Common mistakes
1. Engine per request
async def get_db():
engine = create_async_engine(URL) # BAD: new engine per request
...
Engine should be process-singleton (created in lifespan). Sessions are per-request.
2. Forgetting dispose
Connections leak; tests flake. Lifespan shutdown disposes.
3. Pool too large
Postgres has a max. Don’t exceed it across all replicas / workers.
4. Pool too small
Threads waiting on pool checkout. Increase or use overflow.
5. Not using pool_pre_ping
Stale connections cause spurious errors after long-idle.
Cross-DB notes
- Postgres: max_connections matters; PgBouncer for scale.
- MySQL: similar; connection limit per host can be configured.
- SQLite: file-based; one writer at a time; connection pool less relevant.
What’s next
Chapter 3: Schema and Types.
Read this next
Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .