Cheatsheet for background work. Long-form: Textbook Ch 9 and FastAPI Background Tasks 2026 .
BackgroundTasks (built-in, fire-and-forget)
from fastapi import BackgroundTasks
@app.post("/signup")
async def signup(u: UserIn, bg: BackgroundTasks, db = Depends(get_db)):
user = await create_user(db, u)
bg.add_task(send_welcome_email, user.email)
return {"id": user.id}
Use only when:
- Task is short.
- Idempotent.
- Loss is acceptable (worker restart kills them).
ARQ (Redis, async-native — recommended)
# tasks.py
async def send_email(ctx, to: str, subject: str, body: str):
await sendgrid.send(to=to, subject=subject, body=body)
class WorkerSettings:
functions = [send_email]
redis_settings = RedisSettings(host="redis")
# main.py
from arq import create_pool
from arq.connections import RedisSettings
@asynccontextmanager
async def lifespan(app):
app.state.queue = await create_pool(RedisSettings(host="redis"))
yield
await app.state.queue.close()
@app.post("/signup")
async def signup(u: UserIn, request: Request):
await request.app.state.queue.enqueue_job("send_email", u.email, "Welcome", "...")
return {"ok": True}
arq tasks.WorkerSettings
ARQ retry / cron
from arq import cron
class WorkerSettings:
functions = [send_email, refresh_index]
cron_jobs = [
cron(refresh_index, hour=2, minute=0),
]
# Per-task retries
async def send_email(ctx, ...):
if ctx["job_try"] >= 5:
await record_dlq(...)
return
raise RetryError() # arq retries
Celery
from celery import Celery
celery = Celery("app", broker="redis://...", backend="redis://...")
@celery.task(autoretry_for=(Exception,), retry_backoff=True, max_retries=10)
def send_email(to, subject, body):
sendgrid.send(to=to, subject=subject, body=body)
# In FastAPI
@app.post("/signup")
async def signup(u: UserIn):
send_email.delay(u.email, "Welcome", "...")
return {"ok": True}
celery -A app worker -l info
celery -A app beat # for scheduled tasks
Dramatiq
import dramatiq
from dramatiq.brokers.redis import RedisBroker
dramatiq.set_broker(RedisBroker(host="redis"))
@dramatiq.actor(max_retries=3, time_limit=60_000)
def send_email(to, subject, body):
sendgrid.send(to=to, subject=subject, body=body)
# In FastAPI
send_email.send(u.email, "Welcome", "...")
Procrastinate (Postgres-only)
import procrastinate
app_jobs = procrastinate.App(connector=procrastinate.AiopgConnector(...))
@app_jobs.task(queue="default")
async def send_email(to, subject, body):
...
Uses LISTEN/NOTIFY for low-latency wake-up. No Redis needed.
Idempotency pattern (universal)
async def charge_invoice(ctx, invoice_id: int):
inv = await get_invoice(invoice_id)
if inv.charged: return
await stripe.charge(...)
await mark_charged(inv)
At-least-once delivery is the norm. Handlers must be idempotent.
Pass IDs, not objects
# BAD
bg.add_task(send_welcome, user)
# GOOD
bg.add_task(send_welcome_by_id, user.id)
Worker re-fetches.
Long-running jobs (poll / stream status)
@app.post("/reports")
async def generate(request: Request, current: User = Depends(current_user)):
job = await request.app.state.queue.enqueue_job("generate_report", current.id)
return {"job_id": job.job_id}
@app.get("/jobs/{jid}")
async def status(jid: str, request: Request):
j = ArqJob(jid, request.app.state.queue)
info = await j.info()
return {"status": info.status, "result": info.result if info.status == "complete" else None}
Monitoring queue depth
@app.get("/_/queue")
async def queue(request: Request, _: User = Depends(get_admin_user)):
return {"queued": await request.app.state.queue.llen("arq:queue:default")}
Read this next
If you want my FastAPI + ARQ reference (queue + worker + monitoring), it’s at rajpoot.dev .
Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .