Chapter 9: making SQLAlchemy fast. N+1 detection, bulk operations, streaming, query analysis, and the patterns from production.
N+1 detection
from sqlalchemy.orm import raiseload
stmt = select(User).options(raiseload("*"))
Any unexpected lazy-load raises. Run in tests / dev to catch N+1 before prod.
For dev-time observability: enable echo=True and watch query count per request.
Bulk insert
# Slow (one INSERT per row, with ORM overhead)
for d in data:
session.add(User(**d))
await session.commit()
Bulk insert via Core:
await session.execute(insert(User), data)
await session.commit()
data is a list of dicts. Single INSERT statement with multiple VALUES. Much faster.
Or bulk_insert_mappings (older but still works):
session.bulk_insert_mappings(User, data)
For tens of thousands of rows: COPY (Postgres) is fastest.
Bulk update / delete
await session.execute(update(User).where(User.active == False).values(banned=True))
Single UPDATE; doesn’t load rows into the session.
For per-row complex logic: ORM loop.
Streaming results
result = await session.stream(select(Event))
async for evt in result.scalars():
process(evt)
Doesn’t load everything in memory. Critical for big tables.
For pure SQL streaming via Core:
async with engine.connect() as conn:
result = await conn.stream(select(events_table))
async for row in result:
...
yield_per
For sync code:
result = session.execute(select(Event).execution_options(yield_per=1000))
for evt in result.scalars():
...
Server-side cursor; fetches 1000 at a time.
load_only
Restrict loaded columns:
from sqlalchemy.orm import load_only
stmt = select(User).options(load_only(User.id, User.email))
Smaller payload; faster. Especially with wide tables.
Defer
Defer specific columns:
from sqlalchemy.orm import defer
stmt = select(Post).options(defer(Post.body))
# body loaded only on access (lazy)
For huge text fields you don’t usually need.
Indexes (review)
Indexes are 90% of query performance. Without good indexes, SQLAlchemy can’t help.
class User(Base):
email: Mapped[str] = mapped_column(unique=True, index=True)
__table_args__ = (
Index("ix_users_active_email", "active", "email"),
)
For Postgres-specific (partial, GIN, BRIN): see the Postgres textbook .
EXPLAIN
async with session.begin():
explain = await session.execute(text("EXPLAIN ANALYZE " + str(stmt.compile(compile_kwargs={"literal_binds": True}))))
for row in explain:
print(row)
Postgres / MySQL / SQLite all support EXPLAIN. Use it on slow queries.
Compile to SQL
print(stmt.compile(engine, compile_kwargs={"literal_binds": True}))
See the actual SQL with literal values inlined. Useful for debugging / EXPLAIN.
Echoing queries
For dev / debugging:
engine = create_async_engine(URL, echo=True)
Logs every query. Extremely useful for spotting unexpected queries.
Slow query log
For production:
@event.listens_for(engine.sync_engine, "before_cursor_execute")
def before_cursor_execute(conn, cursor, statement, parameters, context, executemany):
context._query_start_time = time.time()
@event.listens_for(engine.sync_engine, "after_cursor_execute")
def after_cursor_execute(conn, cursor, statement, parameters, context, executemany):
duration = time.time() - context._query_start_time
if duration > 0.5:
log.warning("slow_query", sql=statement[:200], duration_ms=duration * 1000)
Log queries above a threshold. Investigate.
Connection pool monitoring
print(engine.pool.status())
Expose as Prometheus metric:
async def report_pool_stats():
while True:
gauge.set(engine.pool.checked_out())
await asyncio.sleep(10)
Pool exhaustion → request queueing → latency spikes.
ORM overhead
The ORM has overhead vs raw SQL:
- Identity map lookups.
- Object construction.
- Lazy loading checks.
- Event hooks.
For ultra-hot read paths (>10k QPS), Core is leaner:
# Core: ~2x faster for simple reads
async with engine.connect() as conn:
result = await conn.execute(select(users_table).where(users_table.c.id == 1))
row = result.first()
Mix freely. ORM for write paths and complex models; Core for hot reads.
prepared_statement_cache_size
For asyncpg:
engine = create_async_engine(URL, connect_args={"prepared_statement_cache_size": 0})
asyncpg caches prepared statements per-connection. With PgBouncer in transaction mode, this can cause errors. Disable cache.
For psycopg v3: similar concerns; check driver docs.
Batch queries
ids = [1, 2, 3, ..., 1000]
# Bad: 1000 queries
for id in ids:
user = await session.get(User, id)
# Good: 1 query
users = (await session.execute(select(User).where(User.id.in_(ids)))).scalars().all()
Always batch.
Pagination performance
# Slow at scale (offset 100000 walks through them all)
stmt = select(User).order_by(User.id).limit(20).offset(100000)
# Fast (cursor-based)
stmt = select(User).where(User.id > last_seen_id).order_by(User.id).limit(20)
Use cursor-based for any non-trivial dataset. See API Design .
Caching
For frequently-accessed read-mostly data:
import asyncio
CACHE = {}
async def get_user_cached(session, id):
if id in CACHE:
return CACHE[id]
user = await session.get(User, id)
CACHE[id] = user
return user
Naive in-process. For real: Redis, with TTL and invalidation.
For short-lived: dogpile.cache, fastapi-cache.
Connection-level optimizations
async with engine.begin() as conn:
await conn.execute(text("SET work_mem = '256MB'"))
await conn.execute(stmt)
Per-connection tuning for specific queries (Postgres). Be careful: persists for the connection’s life in pool. Reset after.
Common mistakes
1. Selectinload then iterate without using
Loaded but never accessed → wasted query. Profile.
2. ORM bulk insert via session.add
10k rows: 10k INSERTs. Use Core insert with list.
3. Loading by ID one at a time
Use WHERE id IN (...).
4. Forgetting indexes
EXPLAIN ANALYZE shows Seq Scan on huge tables. Add indexes.
5. Not measuring
“It’s slow.” Where? Profile. EXPLAIN. py-spy. See Python Profiling .
What’s next
Chapter 10: Migrations.
Read this next
Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .