Cheatsheet for perf. Long-form: Textbook Ch 9 .

N+1 detection

from sqlalchemy.orm import raiseload

stmt = select(User).options(raiseload("*"))
# any unexpected lazy-load raises in dev/tests

N+1 fix

# one-to-many
stmt = select(User).options(selectinload(User.posts))

# many-to-one / one-to-one
stmt = select(Post).options(joinedload(Post.author))

# Nested
stmt = select(User).options(
    selectinload(User.posts).selectinload(Post.comments)
)

Bulk insert

# Slow
for d in data:
    session.add(User(**d))
await session.commit()

# Fast (single INSERT with multi-row VALUES)
await session.execute(insert(User), data)   # data: list[dict]
await session.commit()

# Older API (still works)
session.bulk_insert_mappings(User, data)

For tens of thousands+: use COPY (Postgres) directly.

Bulk update / delete

await session.execute(update(User).where(User.active == False).values(banned=True))
await session.commit()

Doesn’t load rows into the session. Doesn’t fire ORM hooks.

load_only

from sqlalchemy.orm import load_only

stmt = select(User).options(load_only(User.id, User.email))

Restricts columns; smaller payload; faster.

defer

from sqlalchemy.orm import defer

stmt = select(Post).options(defer(Post.body))
# body loaded on first access (lazy column)

For huge text fields.

yield_per (server-side cursor)

result = await session.execute(
    select(Event).execution_options(yield_per=1000)
)
for evt in result.scalars():
    ...

Sync; for async use session.stream(...).

EXPLAIN

explain = await session.execute(
    text("EXPLAIN ANALYZE " + str(stmt.compile(compile_kwargs={"literal_binds": True})))
)
for row in explain:
    print(row[0])

Compile to SQL

print(stmt.compile(engine, compile_kwargs={"literal_binds": True}))

Slow query log via events

@event.listens_for(engine.sync_engine, "before_cursor_execute")
def before(conn, cur, stmt, params, ctx, executemany):
    ctx._t = time.time()

@event.listens_for(engine.sync_engine, "after_cursor_execute")
def after(conn, cur, stmt, params, ctx, executemany):
    dur = time.time() - ctx._t
    if dur > 0.5:
        log.warning("slow_query", sql=stmt[:500], ms=dur*1000)

Pool stats

print(engine.pool.size(), engine.pool.checked_out())

Expose as Prometheus gauges; alert on near-exhaustion.

Cursor pagination (not offset)

# Slow at large offsets
stmt = select(User).order_by(User.id).limit(20).offset(100_000)

# Fast and stable
stmt = select(User).where(User.id > last_id).order_by(User.id).limit(20)

Batch fetch by IDs

# BAD: 1000 queries
for id in ids:
    await session.get(User, id)

# GOOD: 1 query
users = (await session.execute(select(User).where(User.id.in_(ids)))).scalars().all()

ORM overhead → switch to Core

For ultra-hot reads, Core is leaner:

async with engine.connect() as conn:
    result = await conn.execute(select(users_table).where(users_table.c.id == 1))
    row = result.first()

No identity map, no relationship machinery.

Caching

import functools

@functools.lru_cache(maxsize=10_000)
def _key(user_id): return f"user:{user_id}"

async def get_user_cached(session, id):
    key = _key(id)
    if hit := cache.get(key):
        return hit
    user = await session.get(User, id)
    cache.set(key, user, ttl=60)
    return user

For real: Redis + TTL + invalidation on write.

Connection-level Postgres tuning

async with engine.begin() as conn:
    await conn.execute(text("SET LOCAL work_mem = '256MB'"))
    await conn.execute(stmt)

Per-transaction overrides for memory-intensive queries.

Common mistakes

  • ORM session.add loop for bulk insert.
  • Loading by ID one at a time.
  • selectinload then never accessing the relation.
  • Missing indexes (always EXPLAIN suspicious queries).
  • Not measuring — profile first, optimize second.

Read this next

If you want my pool-metrics + slow-query Prometheus exporter, it’s at rajpoot.dev .


Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .