Cheatsheet for background work. Long-form: Textbook Ch 9 and FastAPI Background Tasks 2026 .

BackgroundTasks (built-in, fire-and-forget)

from fastapi import BackgroundTasks

@app.post("/signup")
async def signup(u: UserIn, bg: BackgroundTasks, db = Depends(get_db)):
    user = await create_user(db, u)
    bg.add_task(send_welcome_email, user.email)
    return {"id": user.id}

Use only when:

  • Task is short.
  • Idempotent.
  • Loss is acceptable (worker restart kills them).
# tasks.py
async def send_email(ctx, to: str, subject: str, body: str):
    await sendgrid.send(to=to, subject=subject, body=body)

class WorkerSettings:
    functions = [send_email]
    redis_settings = RedisSettings(host="redis")
# main.py
from arq import create_pool
from arq.connections import RedisSettings

@asynccontextmanager
async def lifespan(app):
    app.state.queue = await create_pool(RedisSettings(host="redis"))
    yield
    await app.state.queue.close()

@app.post("/signup")
async def signup(u: UserIn, request: Request):
    await request.app.state.queue.enqueue_job("send_email", u.email, "Welcome", "...")
    return {"ok": True}
arq tasks.WorkerSettings

ARQ retry / cron

from arq import cron

class WorkerSettings:
    functions = [send_email, refresh_index]
    cron_jobs = [
        cron(refresh_index, hour=2, minute=0),
    ]

# Per-task retries
async def send_email(ctx, ...):
    if ctx["job_try"] >= 5:
        await record_dlq(...)
        return
    raise RetryError()             # arq retries

Celery

from celery import Celery

celery = Celery("app", broker="redis://...", backend="redis://...")

@celery.task(autoretry_for=(Exception,), retry_backoff=True, max_retries=10)
def send_email(to, subject, body):
    sendgrid.send(to=to, subject=subject, body=body)

# In FastAPI
@app.post("/signup")
async def signup(u: UserIn):
    send_email.delay(u.email, "Welcome", "...")
    return {"ok": True}
celery -A app worker -l info
celery -A app beat                 # for scheduled tasks

Dramatiq

import dramatiq
from dramatiq.brokers.redis import RedisBroker

dramatiq.set_broker(RedisBroker(host="redis"))

@dramatiq.actor(max_retries=3, time_limit=60_000)
def send_email(to, subject, body):
    sendgrid.send(to=to, subject=subject, body=body)

# In FastAPI
send_email.send(u.email, "Welcome", "...")

Procrastinate (Postgres-only)

import procrastinate

app_jobs = procrastinate.App(connector=procrastinate.AiopgConnector(...))

@app_jobs.task(queue="default")
async def send_email(to, subject, body):
    ...

Uses LISTEN/NOTIFY for low-latency wake-up. No Redis needed.

Idempotency pattern (universal)

async def charge_invoice(ctx, invoice_id: int):
    inv = await get_invoice(invoice_id)
    if inv.charged: return
    await stripe.charge(...)
    await mark_charged(inv)

At-least-once delivery is the norm. Handlers must be idempotent.

Pass IDs, not objects

# BAD
bg.add_task(send_welcome, user)

# GOOD
bg.add_task(send_welcome_by_id, user.id)

Worker re-fetches.

Long-running jobs (poll / stream status)

@app.post("/reports")
async def generate(request: Request, current: User = Depends(current_user)):
    job = await request.app.state.queue.enqueue_job("generate_report", current.id)
    return {"job_id": job.job_id}

@app.get("/jobs/{jid}")
async def status(jid: str, request: Request):
    j = ArqJob(jid, request.app.state.queue)
    info = await j.info()
    return {"status": info.status, "result": info.result if info.status == "complete" else None}

Monitoring queue depth

@app.get("/_/queue")
async def queue(request: Request, _: User = Depends(get_admin_user)):
    return {"queued": await request.app.state.queue.llen("arq:queue:default")}

Read this next

If you want my FastAPI + ARQ reference (queue + worker + monitoring), it’s at rajpoot.dev .


Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .