Chapter 5: building queries with select(). Filters, joins, aggregations, subqueries, CTEs, the result API. The 2.0 unified pattern works for both Core and ORM.
Basic select
from sqlalchemy import select
stmt = select(User)
result = await session.execute(stmt)
users = result.scalars().all()
scalars() returns single-column-per-row (the User instance). .all() materializes to list.
For specific columns:
stmt = select(User.id, User.email)
result = await session.execute(stmt)
rows = result.all()
for r in rows:
print(r.id, r.email)
Each row is a Row object; access by attribute or index.
Filtering
stmt = select(User).where(User.email == "[email protected]")
stmt = select(User).where(User.email.like("%@example.com"))
stmt = select(User).where(User.created_at > datetime(2026, 1, 1))
stmt = select(User).where(User.id.in_([1, 2, 3]))
stmt = select(User).where(User.active == True, User.email.is_not(None)) # AND
Multiple where() calls AND together. For OR:
from sqlalchemy import or_
stmt = select(User).where(or_(User.role == "admin", User.id == 1))
For NOT:
from sqlalchemy import not_
stmt = select(User).where(not_(User.active))
scalar_one / scalar_one_or_none / scalars / first
# Exactly one result; raises if 0 or 2+
user = (await session.execute(stmt)).scalar_one()
# 0 or 1; returns None if 0
user = (await session.execute(stmt)).scalar_one_or_none()
# All scalars
users = (await session.execute(stmt)).scalars().all()
# First or None
user = (await session.execute(stmt)).scalars().first()
Pick based on intent. Wrong choice → confusing errors.
Order by
stmt = select(User).order_by(User.created_at.desc(), User.id)
ASC default; .desc() for descending.
Limit / offset
stmt = select(User).limit(20).offset(40) # page 3 of 20
For cursor-based pagination (preferred at scale): see Chapter 9.
Joins
stmt = select(Post, User).join(User, Post.author_id == User.id)
Or via relationships (more on this in Chapter 6):
stmt = select(Post).join(Post.author)
Outer join:
stmt = select(Post).outerjoin(Comment).where(Comment.id == None)
# posts with no comments
Aliasing
from sqlalchemy.orm import aliased
a1 = aliased(User)
a2 = aliased(User)
stmt = select(a1, a2).join(a2, a1.referrer_id == a2.id)
For self-joins or multiple references to the same table.
Aggregations
from sqlalchemy import func
stmt = select(func.count(User.id))
total = (await session.execute(stmt)).scalar()
stmt = select(User.role, func.count(User.id)).group_by(User.role)
counts = (await session.execute(stmt)).all()
stmt = (
select(User.role, func.count(User.id).label("n"))
.group_by(User.role)
.having(func.count(User.id) > 10)
)
func.X invokes any SQL function: count, sum, avg, min, max, lower, upper, etc.
.label() aliases the result.
Distinct
stmt = select(User.email).distinct()
For Postgres-specific SELECT DISTINCT ON: see the Postgres textbook
.
Subqueries
sub = (
select(Post.author_id, func.count(Post.id).label("post_count"))
.group_by(Post.author_id)
.subquery()
)
stmt = select(User, sub.c.post_count).join(sub, User.id == sub.c.author_id)
subquery() wraps in (SELECT ...). Access columns via .c.
CTEs
recent_posts = (
select(Post)
.where(Post.created_at > datetime(2026, 1, 1))
.cte("recent_posts")
)
stmt = select(User, recent_posts).join(recent_posts, User.id == recent_posts.c.author_id)
cte() builds a WITH clause. Cleaner than subqueries for complex queries.
Recursive CTEs
roots = select(Category).where(Category.parent_id == None).cte(name="cats", recursive=True)
extended = roots.union_all(
select(Category).join(roots, Category.parent_id == roots.c.id)
)
stmt = select(extended)
For tree structures.
Exists
from sqlalchemy import exists
stmt = select(User).where(
exists().where(Post.author_id == User.id)
)
# users who have at least one post
Or:
stmt = select(User).where(
select(Post).where(Post.author_id == User.id).exists()
)
Window functions
stmt = select(
Post.author_id,
Post.title,
func.row_number().over(partition_by=Post.author_id, order_by=Post.created_at.desc()).label("rn"),
)
OVER (PARTITION BY ... ORDER BY ...). Postgres supports widely; MySQL 8+; SQLite 3.25+.
Update / delete
from sqlalchemy import update, delete
stmt = update(User).where(User.email == "x").values(active=False)
await session.execute(stmt)
await session.commit()
stmt = delete(User).where(User.id == 5)
await session.execute(stmt)
await session.commit()
update() / delete() are bulk. They don’t fetch objects into the session.
For ORM-managed updates (with hooks, validators, etc.):
user = await session.get(User, 5)
user.email = "y"
await session.commit()
RETURNING
stmt = update(User).where(User.id == 1).values(name="x").returning(User.id, User.name)
result = await session.execute(stmt)
rows = result.all()
RETURNING returns the affected rows. Postgres / SQLite native; MySQL 8+ partial.
Insert
from sqlalchemy import insert
stmt = insert(User).values(email="[email protected]", name="Alice")
await session.execute(stmt)
Or via ORM:
user = User(email="[email protected]", name="Alice")
session.add(user)
await session.commit()
For bulk insert: see Chapter 9.
Upsert (INSERT … ON CONFLICT)
# Postgres
from sqlalchemy.dialects.postgresql import insert as pg_insert
stmt = pg_insert(User).values(email="[email protected]", name="Alice")
stmt = stmt.on_conflict_do_update(
index_elements=["email"],
set_={"name": stmt.excluded.name},
)
# MySQL
from sqlalchemy.dialects.mysql import insert as my_insert
stmt = my_insert(User).values(email="[email protected]", name="Alice")
stmt = stmt.on_duplicate_key_update(name=stmt.inserted.name)
# SQLite
from sqlalchemy.dialects.sqlite import insert as sl_insert
stmt = sl_insert(User).values(email="[email protected]", name="Alice")
stmt = stmt.on_conflict_do_update(index_elements=["email"], set_={"name": stmt.excluded.name})
DB-specific. For DB-agnostic upsert: there isn’t one; check the dialect.
text()
from sqlalchemy import text
stmt = text("SELECT * FROM users WHERE email = :email")
result = await session.execute(stmt, {"email": "[email protected]"})
Raw SQL when you need DB-specific features. Always parameterize (:name, never f-string).
Compiled SQL
stmt = select(User).where(User.email == "x")
print(stmt.compile(engine, compile_kwargs={"literal_binds": True}))
See the actual SQL with literal values inlined. Useful for debugging.
Result API
result = await session.execute(stmt)
# Various accessors:
result.all() # all Row objects
result.first() # first Row or None
result.one() # exactly one Row
result.one_or_none() # 0 or 1
result.scalars() # for one-column results
result.scalars().all() # list of scalars
result.scalars().first()
result.mappings() # dicts instead of Row tuples
result.mappings().all()
Match the call to your query shape.
Pagination by cursor
async def list_users(cursor: int = 0, limit: int = 20):
stmt = select(User).where(User.id > cursor).order_by(User.id).limit(limit + 1)
rows = (await session.execute(stmt)).scalars().all()
has_more = len(rows) > limit
return rows[:limit], (rows[limit - 1].id if has_more else None)
Use the last returned id as the next cursor. Stable under inserts.
For multi-column ordering: more complex tokens. See API Design .
Common mistakes
1. Using Query (legacy)
session.query(User).filter_by(email="x") # 1.x style
Use select(...) and session.execute(...).
2. SELECT * by default
select(User) returns all columns. For wide tables, narrow:
stmt = select(User.id, User.email)
Smaller payload; faster.
3. ORM iteration without scalars()
for row in result: # Row tuple
print(row.User.email)
Use result.scalars() to get instances directly.
4. Forgetting commit on bulk update
session.execute(update(...).values(...)) — bulk; you still need commit.
5. Mixing ORM and bulk
ORM-bulk-update doesn’t trigger ORM events / refresh loaded objects. Track which mode you’re in.
What’s next
Chapter 6: Relationships and loading strategies.
Read this next
Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .