Copy-paste recipes for everyday API work.
Cursor pagination
from base64 import urlsafe_b64encode, urlsafe_b64decode
import json
def encode_cursor(data: dict) -> str:
return urlsafe_b64encode(json.dumps(data).encode()).decode()
def decode_cursor(s: str) -> dict:
return json.loads(urlsafe_b64decode(s.encode()))
async def list_users(session, cursor: str | None = None, limit: int = 20):
stmt = select(User).order_by(User.id).limit(limit + 1)
if cursor:
after_id = decode_cursor(cursor)["id"]
stmt = stmt.where(User.id > after_id)
rows = (await session.execute(stmt)).scalars().all()
has_more = len(rows) > limit
rows = rows[:limit]
next_cursor = encode_cursor({"id": rows[-1].id}) if has_more else None
return rows, next_cursor, has_more
For multi-column ordering (e.g., created_at, id):
def encode_cursor(rec):
return encode({"ts": rec.created_at.isoformat(), "id": rec.id})
stmt = (
select(User)
.where(or_(
User.created_at < cursor_ts,
and_(User.created_at == cursor_ts, User.id < cursor_id),
))
.order_by(User.created_at.desc(), User.id.desc())
.limit(limit + 1)
)
Dynamic filter from Pydantic
class PostFilter(BaseModel):
author_id: int | None = None
tag: str | None = None
published: bool | None = None
q: str | None = None
created_after: datetime | None = None
async def list_posts(session, f: PostFilter, limit: int = 20):
stmt = select(Post)
if f.author_id is not None:
stmt = stmt.where(Post.author_id == f.author_id)
if f.tag is not None:
stmt = stmt.where(Post.tags.contains([f.tag]))
if f.published is not None:
stmt = stmt.where(Post.published == f.published)
if f.q is not None:
stmt = stmt.where(Post.title.ilike(f"%{f.q}%"))
if f.created_after is not None:
stmt = stmt.where(Post.created_at >= f.created_after)
stmt = stmt.order_by(Post.id.desc()).limit(limit)
return (await session.execute(stmt)).scalars().all()
Sort safelist (avoid arbitrary-column injection)
SORT_FIELDS = {
"id": User.id,
"email": User.email,
"created_at": User.created_at,
}
def order_by_param(stmt, sort: str = "id"):
col_name, _, direction = sort.partition(":")
col = SORT_FIELDS.get(col_name, User.id)
return stmt.order_by(col.desc() if direction == "desc" else col)
# Usage: ?sort=created_at:desc
stmt = order_by_param(select(User), request.query_params.get("sort", "id"))
Bulk upsert (Postgres ON CONFLICT)
from sqlalchemy.dialects.postgresql import insert as pg_insert
async def upsert_many(session, records):
stmt = pg_insert(User)
stmt = stmt.on_conflict_do_update(
index_elements=["email"],
set_={
"name": stmt.excluded.name,
"updated_at": func.now(),
},
)
await session.execute(stmt, records)
await session.commit()
Fast COPY (asyncpg)
async def copy_users(records):
async with engine.begin() as conn:
raw = await conn.get_raw_connection()
ap = raw.driver_connection
await ap.copy_records_to_table(
"users",
records=records, # list of tuples
columns=["email", "name"],
)
5-10× faster than INSERT for huge loads.
Search by date range (with index hit)
stmt = select(Event).where(
Event.occurred_at.between(start, end)
).order_by(Event.occurred_at.desc())
Index on occurred_at is used.
EXISTS subquery
stmt = select(User).where(
select(Post).where(Post.author_id == User.id).exists()
)
# users with at least one post
NOT EXISTS
stmt = select(User).where(
~select(Post).where(Post.author_id == User.id).exists()
)
DISTINCT ON (Postgres-only)
from sqlalchemy.dialects.postgresql import distinct as pg_distinct
# Latest post per author
stmt = (
select(Post)
.distinct(Post.author_id)
.order_by(Post.author_id, Post.created_at.desc())
)
Window: row_number() per group
rn = func.row_number().over(
partition_by=Post.author_id,
order_by=Post.created_at.desc(),
).label("rn")
subq = select(Post, rn).subquery()
stmt = select(subq).where(subq.c.rn <= 3)
# Top 3 posts per author
Soft delete with unique constraint
__table_args__ = (
Index(
"ix_users_email_active",
"email",
unique=True,
postgresql_where=text("deleted_at IS NULL"),
),
)
Allows re-registering an email after soft-delete.
Atomic increment / decrement
await session.execute(
update(Account).where(Account.id == 1).values(balance=Account.balance + 100)
)
await session.commit()
DB-side increment; no read-then-write race.
Conditional update (only if state matches)
result = await session.execute(
update(Order)
.where(Order.id == 1, Order.status == "pending")
.values(status="confirmed")
)
if result.rowcount == 0:
raise InvalidStateTransition
Count with filter
total = await session.scalar(
select(func.count(User.id)).where(User.active == True)
)
Read this next
If you want my pagination + filter + sort utility module, it’s at rajpoot.dev .
Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .