Chapter 9: how to do work after the response (or in parallel with the request, off the critical path). FastAPI’s BackgroundTasks, plus integration patterns with real task queues — ARQ, Celery, Dramatiq, Procrastinate — and when each fits.

BackgroundTasks (built-in)

from fastapi import BackgroundTasks

@app.post("/signup")
async def signup(user: UserCreate, bg: BackgroundTasks, db: AsyncSession = Depends(get_db)):
    db_user = await create_user(db, user)
    bg.add_task(send_welcome_email, db_user.email)
    return {"id": db_user.id}

After the response is sent, FastAPI runs send_welcome_email. Same process, same memory.

Use when:

  • Task is short.
  • Idempotent (safe to lose).
  • You can tolerate occasional loss (worker restart, deploy mid-task, crash).

Don’t use when:

  • Task must survive process death.
  • Task must retry on failure.
  • Task is long-running.
  • You need scheduling.

BackgroundTasks is fine for “send a confirmation email after signup” — if it’s lost occasionally, life goes on.

Sync and async tasks

async def send_email_async(email: str): ...
def send_email_sync(email: str): ...

bg.add_task(send_email_async, "[email protected]")
bg.add_task(send_email_sync, "[email protected]")

Both work. Sync runs in the threadpool; async in the loop.

Limitations

  • Lost on worker restart.
  • No retry.
  • No scheduling.
  • No persistence.
  • No multi-process distribution.

For anything important: a real queue.

ARQ — async-first Redis queue

uv add arq
# tasks.py
async def send_email(ctx, to: str, subject: str, body: str):
    await sendgrid.send(to=to, subject=subject, body=body)

class WorkerSettings:
    functions = [send_email]
    redis_settings = RedisSettings(host="redis")
# main.py
from arq import create_pool
from arq.connections import RedisSettings

@asynccontextmanager
async def lifespan(app):
    app.state.queue = await create_pool(RedisSettings(host="redis"))
    yield
    await app.state.queue.close()

@app.post("/signup")
async def signup(user: UserCreate, request: Request):
    await request.app.state.queue.enqueue_job("send_email", user.email, "Welcome", "...")
    return {"ok": True}

Run worker:

arq tasks.WorkerSettings

ARQ is async-native; cleanest fit for FastAPI. See FastAPI Background Tasks .

Celery (the classic)

from celery import Celery

celery = Celery("myapp", broker="redis://redis:6379/0", backend="redis://redis:6379/1")

@celery.task(autoretry_for=(Exception,), retry_backoff=True, max_retries=10)
def send_email(to: str, subject: str, body: str):
    sendgrid.send(to=to, subject=subject, body=body)

# In FastAPI handler
@app.post("/signup")
async def signup(user: UserCreate):
    send_email.delay(user.email, "Welcome", "...")
    return {"ok": True}
celery -A myapp worker -l info

Mature. Many features. Sync workers (Python’s GIL still applies; threading via --concurrency). Used a lot in Django shops; works fine in FastAPI.

See Django + Celery .

Dramatiq

import dramatiq
from dramatiq.brokers.redis import RedisBroker

broker = RedisBroker(host="redis")
dramatiq.set_broker(broker)

@dramatiq.actor(max_retries=3)
def send_email(to, subject, body):
    sendgrid.send(to=to, subject=subject, body=body)

# enqueue
send_email.send("[email protected]", "Welcome", "...")

Cleaner than Celery; fewer features; solid retry semantics. Good middle ground.

Procrastinate (Postgres-backed)

import procrastinate

app_jobs = procrastinate.App(connector=procrastinate.AiopgConnector(...))

@app_jobs.task(queue="default")
async def send_email(to: str, subject: str, body: str):
    ...

Postgres for queue; LISTEN/NOTIFY for low-latency wake-up. No extra infrastructure (assuming you have Postgres).

For small apps not needing Redis: this is excellent. See Postgres as a Queue .

Decision matrix

NeedPick
Send email after signup, OK to loseBackgroundTasks
Reliable async tasks, you have RedisARQ
Mature, complex workflowsCelery
Cleaner Celery alternativeDramatiq
Postgres only; no RedisProcrastinate
Durable, multi-step workflowsTemporal

For most new FastAPI apps: ARQ.

Idempotent task pattern

At-least-once delivery is the norm:

async def charge_invoice(ctx, invoice_id: int):
    inv = await get_invoice(invoice_id)
    if inv.charged:
        return  # already done
    await stripe.charge(...)
    await mark_charged(inv)

Idempotency keys, “already done” checks. See Idempotency .

Pass IDs, not objects

# BAD
bg.add_task(send_welcome, user)  # serializes object; stale

# GOOD
bg.add_task(send_welcome_by_id, user.id)

Same for queues. Worker re-fetches from DB.

Retries

ARQ:

async def send_email(ctx, ...):
    try:
        await sendgrid.send(...)
    except RetryableError:
        raise procrastinate.JobAborted  # fail; arq won't retry
    # default behavior retries on exception

Celery:

@celery.task(autoretry_for=(ConnectionError,), retry_backoff=True, max_retries=10)
def send_email(...): ...

Always exponential backoff with jitter.

Dead-letter queue

async def send_email(ctx, to, ...):
    if ctx["job_try"] >= 5:
        await record_dead_letter(ctx["job_id"], "send_email", ...)
        return
    try:
        await sendgrid.send(...)
    except RetryableError:
        raise

Or use the broker’s DLQ feature (Celery, RabbitMQ).

Scheduled jobs

ARQ has cron:

class WorkerSettings:
    cron_jobs = [
        cron(cleanup_expired, hour=2, minute=0),
        cron(send_daily_digest, hour=9, minute=0),
    ]

Celery has Beat:

celery.conf.beat_schedule = {
    "cleanup": {"task": "myapp.cleanup", "schedule": crontab(minute=0, hour=2)},
}

Run celery beat alongside workers.

Long-running tasks

For tasks > 5 minutes:

  • Run as a job; client polls /jobs/{id} for status.
  • Or stream progress via WebSocket / SSE (Chapter 8).
  • Or use a workflow engine (Temporal) for durable execution.
@app.post("/reports/generate")
async def generate(user: User = Depends(get_current_user), request: Request = None):
    job_id = await request.app.state.queue.enqueue_job("generate_report", user.id)
    return {"job_id": job_id.job_id}

@app.get("/jobs/{job_id}")
async def job_status(job_id: str, request: Request):
    job = ArqJob(job_id, request.app.state.queue)
    info = await job.info()
    return {"status": info.status, "result": info.result if info.status == "complete" else None}

Frontend polls; updates UI.

Workflow engines

For complex multi-step (with compensation, timers, signals): Temporal .

# Workflow definition
@workflow.defn
class OrderWorkflow:
    @workflow.run
    async def run(self, order: Order) -> str:
        await workflow.execute_activity(reserve_inventory, order, schedule_to_close_timeout=timedelta(seconds=30))
        await workflow.execute_activity(charge_payment, order, schedule_to_close_timeout=timedelta(seconds=60))
        await workflow.execute_activity(ship_order, order)
        return "success"

Beyond what Celery / ARQ do well. Use when steps need durability, signals, timers.

Monitoring

@app.get("/admin/queue-status")
async def queue_status(request: Request, _: User = Depends(get_admin_user)):
    redis = request.app.state.queue
    return {
        "queued": await redis.llen("arq:queue:default"),
        "active": await redis.scard("arq:active"),
    }

Or use built-in tools: ARQ’s arq --check, Flower for Celery, Dramatiq’s middleware.

For prod: Prometheus metrics on queue depth, processing rate, error rate.

See Observability Stack .

Common mistakes

1. BackgroundTasks for things that matter

Worker restart → task lost. Use a real queue.

2. No retry

Network blip → email never sent.

3. Pickling Django/SQLA models

Stale state at task time. Pass IDs.

4. No timeout on tasks

Hung task; worker stuck. Set time limits.

5. Same DB pool for web and workers

One slow report blocks web requests. Separate pools.

What’s next

Chapter 10: Testing.

Read this next


Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .