Copy-paste recipes for everyday API work.

Cursor pagination

from base64 import urlsafe_b64encode, urlsafe_b64decode
import json

def encode_cursor(data: dict) -> str:
    return urlsafe_b64encode(json.dumps(data).encode()).decode()

def decode_cursor(s: str) -> dict:
    return json.loads(urlsafe_b64decode(s.encode()))

async def list_users(session, cursor: str | None = None, limit: int = 20):
    stmt = select(User).order_by(User.id).limit(limit + 1)
    if cursor:
        after_id = decode_cursor(cursor)["id"]
        stmt = stmt.where(User.id > after_id)
    rows = (await session.execute(stmt)).scalars().all()
    has_more = len(rows) > limit
    rows = rows[:limit]
    next_cursor = encode_cursor({"id": rows[-1].id}) if has_more else None
    return rows, next_cursor, has_more

For multi-column ordering (e.g., created_at, id):

def encode_cursor(rec):
    return encode({"ts": rec.created_at.isoformat(), "id": rec.id})

stmt = (
    select(User)
    .where(or_(
        User.created_at < cursor_ts,
        and_(User.created_at == cursor_ts, User.id < cursor_id),
    ))
    .order_by(User.created_at.desc(), User.id.desc())
    .limit(limit + 1)
)

Dynamic filter from Pydantic

class PostFilter(BaseModel):
    author_id: int | None = None
    tag: str | None = None
    published: bool | None = None
    q: str | None = None
    created_after: datetime | None = None

async def list_posts(session, f: PostFilter, limit: int = 20):
    stmt = select(Post)
    if f.author_id is not None:
        stmt = stmt.where(Post.author_id == f.author_id)
    if f.tag is not None:
        stmt = stmt.where(Post.tags.contains([f.tag]))
    if f.published is not None:
        stmt = stmt.where(Post.published == f.published)
    if f.q is not None:
        stmt = stmt.where(Post.title.ilike(f"%{f.q}%"))
    if f.created_after is not None:
        stmt = stmt.where(Post.created_at >= f.created_after)
    stmt = stmt.order_by(Post.id.desc()).limit(limit)
    return (await session.execute(stmt)).scalars().all()

Sort safelist (avoid arbitrary-column injection)

SORT_FIELDS = {
    "id": User.id,
    "email": User.email,
    "created_at": User.created_at,
}

def order_by_param(stmt, sort: str = "id"):
    col_name, _, direction = sort.partition(":")
    col = SORT_FIELDS.get(col_name, User.id)
    return stmt.order_by(col.desc() if direction == "desc" else col)

# Usage: ?sort=created_at:desc
stmt = order_by_param(select(User), request.query_params.get("sort", "id"))

Bulk upsert (Postgres ON CONFLICT)

from sqlalchemy.dialects.postgresql import insert as pg_insert

async def upsert_many(session, records):
    stmt = pg_insert(User)
    stmt = stmt.on_conflict_do_update(
        index_elements=["email"],
        set_={
            "name": stmt.excluded.name,
            "updated_at": func.now(),
        },
    )
    await session.execute(stmt, records)
    await session.commit()

Fast COPY (asyncpg)

async def copy_users(records):
    async with engine.begin() as conn:
        raw = await conn.get_raw_connection()
        ap = raw.driver_connection
        await ap.copy_records_to_table(
            "users",
            records=records,           # list of tuples
            columns=["email", "name"],
        )

5-10× faster than INSERT for huge loads.

Search by date range (with index hit)

stmt = select(Event).where(
    Event.occurred_at.between(start, end)
).order_by(Event.occurred_at.desc())

Index on occurred_at is used.

EXISTS subquery

stmt = select(User).where(
    select(Post).where(Post.author_id == User.id).exists()
)
# users with at least one post

NOT EXISTS

stmt = select(User).where(
    ~select(Post).where(Post.author_id == User.id).exists()
)

DISTINCT ON (Postgres-only)

from sqlalchemy.dialects.postgresql import distinct as pg_distinct

# Latest post per author
stmt = (
    select(Post)
    .distinct(Post.author_id)
    .order_by(Post.author_id, Post.created_at.desc())
)

Window: row_number() per group

rn = func.row_number().over(
    partition_by=Post.author_id,
    order_by=Post.created_at.desc(),
).label("rn")

subq = select(Post, rn).subquery()
stmt = select(subq).where(subq.c.rn <= 3)
# Top 3 posts per author

Soft delete with unique constraint

__table_args__ = (
    Index(
        "ix_users_email_active",
        "email",
        unique=True,
        postgresql_where=text("deleted_at IS NULL"),
    ),
)

Allows re-registering an email after soft-delete.

Atomic increment / decrement

await session.execute(
    update(Account).where(Account.id == 1).values(balance=Account.balance + 100)
)
await session.commit()

DB-side increment; no read-then-write race.

Conditional update (only if state matches)

result = await session.execute(
    update(Order)
    .where(Order.id == 1, Order.status == "pending")
    .values(status="confirmed")
)
if result.rowcount == 0:
    raise InvalidStateTransition

Count with filter

total = await session.scalar(
    select(func.count(User.id)).where(User.active == True)
)

Read this next

If you want my pagination + filter + sort utility module, it’s at rajpoot.dev .


Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .