Cheatsheet for perf. Long-form: Textbook Ch 9 .
N+1 detection
from sqlalchemy.orm import raiseload
stmt = select(User).options(raiseload("*"))
# any unexpected lazy-load raises in dev/tests
N+1 fix
# one-to-many
stmt = select(User).options(selectinload(User.posts))
# many-to-one / one-to-one
stmt = select(Post).options(joinedload(Post.author))
# Nested
stmt = select(User).options(
selectinload(User.posts).selectinload(Post.comments)
)
Bulk insert
# Slow
for d in data:
session.add(User(**d))
await session.commit()
# Fast (single INSERT with multi-row VALUES)
await session.execute(insert(User), data) # data: list[dict]
await session.commit()
# Older API (still works)
session.bulk_insert_mappings(User, data)
For tens of thousands+: use COPY (Postgres) directly.
Bulk update / delete
await session.execute(update(User).where(User.active == False).values(banned=True))
await session.commit()
Doesn’t load rows into the session. Doesn’t fire ORM hooks.
load_only
from sqlalchemy.orm import load_only
stmt = select(User).options(load_only(User.id, User.email))
Restricts columns; smaller payload; faster.
defer
from sqlalchemy.orm import defer
stmt = select(Post).options(defer(Post.body))
# body loaded on first access (lazy column)
For huge text fields.
yield_per (server-side cursor)
result = await session.execute(
select(Event).execution_options(yield_per=1000)
)
for evt in result.scalars():
...
Sync; for async use session.stream(...).
EXPLAIN
explain = await session.execute(
text("EXPLAIN ANALYZE " + str(stmt.compile(compile_kwargs={"literal_binds": True})))
)
for row in explain:
print(row[0])
Compile to SQL
print(stmt.compile(engine, compile_kwargs={"literal_binds": True}))
Slow query log via events
@event.listens_for(engine.sync_engine, "before_cursor_execute")
def before(conn, cur, stmt, params, ctx, executemany):
ctx._t = time.time()
@event.listens_for(engine.sync_engine, "after_cursor_execute")
def after(conn, cur, stmt, params, ctx, executemany):
dur = time.time() - ctx._t
if dur > 0.5:
log.warning("slow_query", sql=stmt[:500], ms=dur*1000)
Pool stats
print(engine.pool.size(), engine.pool.checked_out())
Expose as Prometheus gauges; alert on near-exhaustion.
Cursor pagination (not offset)
# Slow at large offsets
stmt = select(User).order_by(User.id).limit(20).offset(100_000)
# Fast and stable
stmt = select(User).where(User.id > last_id).order_by(User.id).limit(20)
Batch fetch by IDs
# BAD: 1000 queries
for id in ids:
await session.get(User, id)
# GOOD: 1 query
users = (await session.execute(select(User).where(User.id.in_(ids)))).scalars().all()
ORM overhead → switch to Core
For ultra-hot reads, Core is leaner:
async with engine.connect() as conn:
result = await conn.execute(select(users_table).where(users_table.c.id == 1))
row = result.first()
No identity map, no relationship machinery.
Caching
import functools
@functools.lru_cache(maxsize=10_000)
def _key(user_id): return f"user:{user_id}"
async def get_user_cached(session, id):
key = _key(id)
if hit := cache.get(key):
return hit
user = await session.get(User, id)
cache.set(key, user, ttl=60)
return user
For real: Redis + TTL + invalidation on write.
Connection-level Postgres tuning
async with engine.begin() as conn:
await conn.execute(text("SET LOCAL work_mem = '256MB'"))
await conn.execute(stmt)
Per-transaction overrides for memory-intensive queries.
Common mistakes
- ORM
session.addloop for bulk insert. - Loading by ID one at a time.
selectinloadthen never accessing the relation.- Missing indexes (always EXPLAIN suspicious queries).
- Not measuring — profile first, optimize second.
Read this next
If you want my pool-metrics + slow-query Prometheus exporter, it’s at rajpoot.dev .
Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .