Cheatsheet for background work that integrates with your SQLAlchemy DB.

ARQ + SQLAlchemy

# worker.py
from arq import ArqRedis, run_worker
from arq.connections import RedisSettings
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine

engine = create_async_engine(DATABASE_URL, pool_size=5)
SessionLocal = async_sessionmaker(engine, expire_on_commit=False)

async def send_welcome(ctx, user_id: int):
    async with SessionLocal() as s:
        user = await s.get(User, user_id)
        if not user: return
        await sendgrid.send(to=user.email, ...)

async def on_startup(ctx):
    ctx["engine"] = engine

async def on_shutdown(ctx):
    await engine.dispose()

class WorkerSettings:
    redis_settings = RedisSettings(host="redis")
    functions = [send_welcome]
    on_startup = on_startup
    on_shutdown = on_shutdown
    max_jobs = 10
    job_timeout = 60

Enqueue from FastAPI

# main.py
@asynccontextmanager
async def lifespan(app):
    # ... engine, sm ...
    app.state.arq = await create_pool(RedisSettings(host="redis"))
    yield
    await app.state.arq.close()
    await app.state.engine.dispose()

@app.post("/signup")
async def signup(data: UserIn, request: Request, db: AsyncSession = Depends(get_db)):
    user = await create_user(db, data)
    await request.app.state.arq.enqueue_job("send_welcome", user.id)
    return user

Outbox + worker pattern (durability)

For events that must not be lost on Redis failure:

# Same transaction as business write
async def create_user_with_event(db, data):
    user = User(email=data.email)
    db.add(user)
    
    db.add(OutboxEvent(
        type="user.created",
        payload={"id": user.id, "email": data.email},
    ))
    await db.commit()
    return user

# Worker drains outbox + publishes
async def outbox_worker(ctx):
    async with SessionLocal() as s:
        async with s.begin():
            events = (await s.execute(
                select(OutboxEvent)
                .where(OutboxEvent.published_at == None)
                .order_by(OutboxEvent.occurred_at)
                .limit(100)
                .with_for_update(skip_locked=True)
            )).scalars().all()
            
            for e in events:
                await bus.publish(e.type, e.payload)
                e.published_at = datetime.utcnow()

LISTEN/NOTIFY wake-up (low latency)

# Trigger
CREATE OR REPLACE FUNCTION notify_outbox() RETURNS TRIGGER AS $$
BEGIN PERFORM pg_notify('outbox', ''); RETURN NEW; END;
$$ LANGUAGE plpgsql;

CREATE TRIGGER outbox_notify AFTER INSERT ON outbox_events
FOR EACH ROW EXECUTE FUNCTION notify_outbox();
# Listener
async def listen():
    conn = await asyncpg.connect(DATABASE_URL)
    await conn.add_listener("outbox", lambda *_: asyncio.create_task(outbox_worker({})))
    while True:
        await asyncio.sleep(60)

Sub-millisecond wake; periodic drain as safety net.

Idempotent jobs

async def charge_invoice(ctx, invoice_id: int):
    async with SessionLocal() as s:
        inv = await s.get(Invoice, invoice_id)
        if inv.charged_at: return       # already done
        
        # Stripe idempotency key
        await stripe.charge(idempotency_key=f"inv-{invoice_id}", ...)
        
        inv.charged_at = datetime.utcnow()
        await s.commit()

At-least-once delivery. Always idempotent.

Pass IDs, not objects

# BAD
bg.add_task(send_welcome, user)            # serializes; stale

# GOOD
bg.add_task(send_welcome_by_id, user.id)   # worker re-fetches

Worker DB pool

Separate from web pool:

# web engine: pool_size=20
# worker engine: pool_size=5

# Don't share — worker can saturate one pool and starve the other.

Multi-tenant jobs

async def send_welcome(ctx, user_id: int, tenant_id: int):
    current_tenant.set(tenant_id)        # bind contextvar so SA event picks it up
    async with SessionLocal() as s:
        user = await s.get(User, user_id)
        # RLS applies via SET LOCAL

Pass tenant_id to every job.

Tracing across queue

Propagate trace context:

async def enqueue_with_trace(arq, fn_name, *args):
    span = trace.get_current_span()
    traceparent = format_traceparent(span)
    await arq.enqueue_job(fn_name, *args, _job_data={"traceparent": traceparent})

# Worker resumes:
ctx = extract({"traceparent": ctx["job_data"]["traceparent"]})
with tracer.start_as_current_span("job", context=ctx):
    ...

Job retries

class WorkerSettings:
    max_tries = 5
    retry_jobs = True
    
async def send_email(ctx, ...):
    try:
        await sendgrid.send(...)
    except RetryableError:
        if ctx["job_try"] >= 5:
            await record_dead_letter(ctx["job_id"], ...)
            return
        raise            # let arq retry

Scheduled jobs

class WorkerSettings:
    cron_jobs = [
        cron("app.tasks.cleanup_expired", hour=2, minute=0),
        cron("app.tasks.send_digest", hour=9, minute=0),
    ]

Polling for status

@app.post("/reports")
async def request_report(data: ReportRequest, request: Request, user: User = Depends(current_user)):
    job = await request.app.state.arq.enqueue_job("generate_report", user.id, data.dict())
    return {"job_id": job.job_id}

@app.get("/jobs/{jid}")
async def job_status(jid: str, request: Request):
    from arq.jobs import Job
    j = Job(jid, request.app.state.arq)
    info = await j.info()
    return {"status": info.status, "result": info.result if info.status == "complete" else None}

Frontend polls.

Common mistakes

  • Worker without its own DB pool — saturates web pool.
  • Forgetting idempotency — duplicate work.
  • Pickling SA models — stale state at job time.
  • No retries with backoff — transient failures fail.
  • Outbox without LISTEN — high latency.

Read this next

If you want my outbox + arq + SA worker template, it’s at rajpoot.dev .


Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .