Cheatsheet for background work that integrates with your SQLAlchemy DB.
ARQ + SQLAlchemy
# worker.py
from arq import ArqRedis, run_worker
from arq.connections import RedisSettings
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine
engine = create_async_engine(DATABASE_URL, pool_size=5)
SessionLocal = async_sessionmaker(engine, expire_on_commit=False)
async def send_welcome(ctx, user_id: int):
async with SessionLocal() as s:
user = await s.get(User, user_id)
if not user: return
await sendgrid.send(to=user.email, ...)
async def on_startup(ctx):
ctx["engine"] = engine
async def on_shutdown(ctx):
await engine.dispose()
class WorkerSettings:
redis_settings = RedisSettings(host="redis")
functions = [send_welcome]
on_startup = on_startup
on_shutdown = on_shutdown
max_jobs = 10
job_timeout = 60
Enqueue from FastAPI
# main.py
@asynccontextmanager
async def lifespan(app):
# ... engine, sm ...
app.state.arq = await create_pool(RedisSettings(host="redis"))
yield
await app.state.arq.close()
await app.state.engine.dispose()
@app.post("/signup")
async def signup(data: UserIn, request: Request, db: AsyncSession = Depends(get_db)):
user = await create_user(db, data)
await request.app.state.arq.enqueue_job("send_welcome", user.id)
return user
Outbox + worker pattern (durability)
For events that must not be lost on Redis failure:
# Same transaction as business write
async def create_user_with_event(db, data):
user = User(email=data.email)
db.add(user)
db.add(OutboxEvent(
type="user.created",
payload={"id": user.id, "email": data.email},
))
await db.commit()
return user
# Worker drains outbox + publishes
async def outbox_worker(ctx):
async with SessionLocal() as s:
async with s.begin():
events = (await s.execute(
select(OutboxEvent)
.where(OutboxEvent.published_at == None)
.order_by(OutboxEvent.occurred_at)
.limit(100)
.with_for_update(skip_locked=True)
)).scalars().all()
for e in events:
await bus.publish(e.type, e.payload)
e.published_at = datetime.utcnow()
LISTEN/NOTIFY wake-up (low latency)
# Trigger
CREATE OR REPLACE FUNCTION notify_outbox() RETURNS TRIGGER AS $$
BEGIN PERFORM pg_notify('outbox', ''); RETURN NEW; END;
$$ LANGUAGE plpgsql;
CREATE TRIGGER outbox_notify AFTER INSERT ON outbox_events
FOR EACH ROW EXECUTE FUNCTION notify_outbox();
# Listener
async def listen():
conn = await asyncpg.connect(DATABASE_URL)
await conn.add_listener("outbox", lambda *_: asyncio.create_task(outbox_worker({})))
while True:
await asyncio.sleep(60)
Sub-millisecond wake; periodic drain as safety net.
Idempotent jobs
async def charge_invoice(ctx, invoice_id: int):
async with SessionLocal() as s:
inv = await s.get(Invoice, invoice_id)
if inv.charged_at: return # already done
# Stripe idempotency key
await stripe.charge(idempotency_key=f"inv-{invoice_id}", ...)
inv.charged_at = datetime.utcnow()
await s.commit()
At-least-once delivery. Always idempotent.
Pass IDs, not objects
# BAD
bg.add_task(send_welcome, user) # serializes; stale
# GOOD
bg.add_task(send_welcome_by_id, user.id) # worker re-fetches
Worker DB pool
Separate from web pool:
# web engine: pool_size=20
# worker engine: pool_size=5
# Don't share — worker can saturate one pool and starve the other.
Multi-tenant jobs
async def send_welcome(ctx, user_id: int, tenant_id: int):
current_tenant.set(tenant_id) # bind contextvar so SA event picks it up
async with SessionLocal() as s:
user = await s.get(User, user_id)
# RLS applies via SET LOCAL
Pass tenant_id to every job.
Tracing across queue
Propagate trace context:
async def enqueue_with_trace(arq, fn_name, *args):
span = trace.get_current_span()
traceparent = format_traceparent(span)
await arq.enqueue_job(fn_name, *args, _job_data={"traceparent": traceparent})
# Worker resumes:
ctx = extract({"traceparent": ctx["job_data"]["traceparent"]})
with tracer.start_as_current_span("job", context=ctx):
...
Job retries
class WorkerSettings:
max_tries = 5
retry_jobs = True
async def send_email(ctx, ...):
try:
await sendgrid.send(...)
except RetryableError:
if ctx["job_try"] >= 5:
await record_dead_letter(ctx["job_id"], ...)
return
raise # let arq retry
Scheduled jobs
class WorkerSettings:
cron_jobs = [
cron("app.tasks.cleanup_expired", hour=2, minute=0),
cron("app.tasks.send_digest", hour=9, minute=0),
]
Polling for status
@app.post("/reports")
async def request_report(data: ReportRequest, request: Request, user: User = Depends(current_user)):
job = await request.app.state.arq.enqueue_job("generate_report", user.id, data.dict())
return {"job_id": job.job_id}
@app.get("/jobs/{jid}")
async def job_status(jid: str, request: Request):
from arq.jobs import Job
j = Job(jid, request.app.state.arq)
info = await j.info()
return {"status": info.status, "result": info.result if info.status == "complete" else None}
Frontend polls.
Common mistakes
- Worker without its own DB pool — saturates web pool.
- Forgetting idempotency — duplicate work.
- Pickling SA models — stale state at job time.
- No retries with backoff — transient failures fail.
- Outbox without LISTEN — high latency.
Read this next
If you want my outbox + arq + SA worker template, it’s at rajpoot.dev .
Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .