Chapter 9: how to do work after the response (or in parallel with the request, off the critical path). FastAPI’s BackgroundTasks, plus integration patterns with real task queues — ARQ, Celery, Dramatiq, Procrastinate — and when each fits.
BackgroundTasks (built-in)
from fastapi import BackgroundTasks
@app.post("/signup")
async def signup(user: UserCreate, bg: BackgroundTasks, db: AsyncSession = Depends(get_db)):
db_user = await create_user(db, user)
bg.add_task(send_welcome_email, db_user.email)
return {"id": db_user.id}
After the response is sent, FastAPI runs send_welcome_email. Same process, same memory.
Use when:
- Task is short.
- Idempotent (safe to lose).
- You can tolerate occasional loss (worker restart, deploy mid-task, crash).
Don’t use when:
- Task must survive process death.
- Task must retry on failure.
- Task is long-running.
- You need scheduling.
BackgroundTasks is fine for “send a confirmation email after signup” — if it’s lost occasionally, life goes on.
Sync and async tasks
async def send_email_async(email: str): ...
def send_email_sync(email: str): ...
bg.add_task(send_email_async, "[email protected]")
bg.add_task(send_email_sync, "[email protected]")
Both work. Sync runs in the threadpool; async in the loop.
Limitations
- Lost on worker restart.
- No retry.
- No scheduling.
- No persistence.
- No multi-process distribution.
For anything important: a real queue.
ARQ — async-first Redis queue
uv add arq
# tasks.py
async def send_email(ctx, to: str, subject: str, body: str):
await sendgrid.send(to=to, subject=subject, body=body)
class WorkerSettings:
functions = [send_email]
redis_settings = RedisSettings(host="redis")
# main.py
from arq import create_pool
from arq.connections import RedisSettings
@asynccontextmanager
async def lifespan(app):
app.state.queue = await create_pool(RedisSettings(host="redis"))
yield
await app.state.queue.close()
@app.post("/signup")
async def signup(user: UserCreate, request: Request):
await request.app.state.queue.enqueue_job("send_email", user.email, "Welcome", "...")
return {"ok": True}
Run worker:
arq tasks.WorkerSettings
ARQ is async-native; cleanest fit for FastAPI. See FastAPI Background Tasks .
Celery (the classic)
from celery import Celery
celery = Celery("myapp", broker="redis://redis:6379/0", backend="redis://redis:6379/1")
@celery.task(autoretry_for=(Exception,), retry_backoff=True, max_retries=10)
def send_email(to: str, subject: str, body: str):
sendgrid.send(to=to, subject=subject, body=body)
# In FastAPI handler
@app.post("/signup")
async def signup(user: UserCreate):
send_email.delay(user.email, "Welcome", "...")
return {"ok": True}
celery -A myapp worker -l info
Mature. Many features. Sync workers (Python’s GIL still applies; threading via --concurrency). Used a lot in Django shops; works fine in FastAPI.
See Django + Celery .
Dramatiq
import dramatiq
from dramatiq.brokers.redis import RedisBroker
broker = RedisBroker(host="redis")
dramatiq.set_broker(broker)
@dramatiq.actor(max_retries=3)
def send_email(to, subject, body):
sendgrid.send(to=to, subject=subject, body=body)
# enqueue
send_email.send("[email protected]", "Welcome", "...")
Cleaner than Celery; fewer features; solid retry semantics. Good middle ground.
Procrastinate (Postgres-backed)
import procrastinate
app_jobs = procrastinate.App(connector=procrastinate.AiopgConnector(...))
@app_jobs.task(queue="default")
async def send_email(to: str, subject: str, body: str):
...
Postgres for queue; LISTEN/NOTIFY for low-latency wake-up. No extra infrastructure (assuming you have Postgres).
For small apps not needing Redis: this is excellent. See Postgres as a Queue .
Decision matrix
| Need | Pick |
|---|---|
| Send email after signup, OK to lose | BackgroundTasks |
| Reliable async tasks, you have Redis | ARQ |
| Mature, complex workflows | Celery |
| Cleaner Celery alternative | Dramatiq |
| Postgres only; no Redis | Procrastinate |
| Durable, multi-step workflows | Temporal |
For most new FastAPI apps: ARQ.
Idempotent task pattern
At-least-once delivery is the norm:
async def charge_invoice(ctx, invoice_id: int):
inv = await get_invoice(invoice_id)
if inv.charged:
return # already done
await stripe.charge(...)
await mark_charged(inv)
Idempotency keys, “already done” checks. See Idempotency .
Pass IDs, not objects
# BAD
bg.add_task(send_welcome, user) # serializes object; stale
# GOOD
bg.add_task(send_welcome_by_id, user.id)
Same for queues. Worker re-fetches from DB.
Retries
ARQ:
async def send_email(ctx, ...):
try:
await sendgrid.send(...)
except RetryableError:
raise procrastinate.JobAborted # fail; arq won't retry
# default behavior retries on exception
Celery:
@celery.task(autoretry_for=(ConnectionError,), retry_backoff=True, max_retries=10)
def send_email(...): ...
Always exponential backoff with jitter.
Dead-letter queue
async def send_email(ctx, to, ...):
if ctx["job_try"] >= 5:
await record_dead_letter(ctx["job_id"], "send_email", ...)
return
try:
await sendgrid.send(...)
except RetryableError:
raise
Or use the broker’s DLQ feature (Celery, RabbitMQ).
Scheduled jobs
ARQ has cron:
class WorkerSettings:
cron_jobs = [
cron(cleanup_expired, hour=2, minute=0),
cron(send_daily_digest, hour=9, minute=0),
]
Celery has Beat:
celery.conf.beat_schedule = {
"cleanup": {"task": "myapp.cleanup", "schedule": crontab(minute=0, hour=2)},
}
Run celery beat alongside workers.
Long-running tasks
For tasks > 5 minutes:
- Run as a job; client polls
/jobs/{id}for status. - Or stream progress via WebSocket / SSE (Chapter 8).
- Or use a workflow engine (Temporal) for durable execution.
@app.post("/reports/generate")
async def generate(user: User = Depends(get_current_user), request: Request = None):
job_id = await request.app.state.queue.enqueue_job("generate_report", user.id)
return {"job_id": job_id.job_id}
@app.get("/jobs/{job_id}")
async def job_status(job_id: str, request: Request):
job = ArqJob(job_id, request.app.state.queue)
info = await job.info()
return {"status": info.status, "result": info.result if info.status == "complete" else None}
Frontend polls; updates UI.
Workflow engines
For complex multi-step (with compensation, timers, signals): Temporal .
# Workflow definition
@workflow.defn
class OrderWorkflow:
@workflow.run
async def run(self, order: Order) -> str:
await workflow.execute_activity(reserve_inventory, order, schedule_to_close_timeout=timedelta(seconds=30))
await workflow.execute_activity(charge_payment, order, schedule_to_close_timeout=timedelta(seconds=60))
await workflow.execute_activity(ship_order, order)
return "success"
Beyond what Celery / ARQ do well. Use when steps need durability, signals, timers.
Monitoring
@app.get("/admin/queue-status")
async def queue_status(request: Request, _: User = Depends(get_admin_user)):
redis = request.app.state.queue
return {
"queued": await redis.llen("arq:queue:default"),
"active": await redis.scard("arq:active"),
}
Or use built-in tools: ARQ’s arq --check, Flower for Celery, Dramatiq’s middleware.
For prod: Prometheus metrics on queue depth, processing rate, error rate.
See Observability Stack .
Common mistakes
1. BackgroundTasks for things that matter
Worker restart → task lost. Use a real queue.
2. No retry
Network blip → email never sent.
3. Pickling Django/SQLA models
Stale state at task time. Pass IDs.
4. No timeout on tasks
Hung task; worker stuck. Set time limits.
5. Same DB pool for web and workers
One slow report blocks web requests. Separate pools.
What’s next
Chapter 10: Testing.
Read this next
Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .