Cheatsheet for async + concurrency in FastAPI. Long-form: Textbook Ch 7 and Python Async Patterns 2026 .
async def vs def handlers
@app.get("/a")
async def a():
await db.query(...) # event loop; never block
@app.get("/b")
def b():
return blocking_call() # runs in threadpool (AnyIO)
Mix freely. For IO-bound + async libs: async def. For sync-only legacy code: def is fine in moderation.
Increase threadpool
import anyio
limiter = anyio.to_thread.current_default_thread_limiter()
limiter.total_tokens = 100 # default 40
If many def handlers block at once: bump.
Blocking traps in async handlers
| Don’t | Do |
|---|---|
time.sleep(5) | await asyncio.sleep(5) |
requests.get(...) | httpx.AsyncClient |
psycopg2 sync | asyncpg / async SQLAlchemy |
open() for big files | aiofiles |
| Heavy CPU | asyncio.to_thread / process pool |
Offload sync to a thread
result = await asyncio.to_thread(legacy_sync, arg1, arg2)
For CPU-bound: ProcessPoolExecutor:
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(executor, heavy_cpu_fn, arg)
TaskGroup (Python 3.11+)
import asyncio
@app.get("/parallel")
async def parallel():
async with asyncio.TaskGroup() as tg:
a = tg.create_task(fetch_a())
b = tg.create_task(fetch_b())
return {"a": a.result(), "b": b.result()}
Or AnyIO (works with both asyncio + trio):
import anyio
@app.get("/p")
async def p():
out = {}
async with anyio.create_task_group() as tg:
async def fa(): out["a"] = await fetch_a()
async def fb(): out["b"] = await fetch_b()
tg.start_soon(fa)
tg.start_soon(fb)
return out
Concurrency limit (semaphore)
sem = anyio.Semaphore(10)
async def fetch(url):
async with sem:
return await client.get(url)
Timeouts
async with asyncio.timeout(5):
result = await slow_op()
# or
async with anyio.fail_after(5):
result = await slow_op()
Always have timeouts on external calls.
Cancellation
@app.get("/")
async def home():
try:
await long_op()
except asyncio.CancelledError:
await cleanup()
raise # re-raise!
Don’t swallow CancelledError.
Disconnection check
async def stream(request: Request):
async def gen():
for i in range(10000):
if await request.is_disconnected(): return
yield f"data: {i}\n\n"
await asyncio.sleep(1)
return StreamingResponse(gen(), media_type="text/event-stream")
Connection pools (lifespan-scoped)
@asynccontextmanager
async def lifespan(app):
app.state.engine = create_async_engine(URL, pool_size=20, max_overflow=10)
app.state.http = httpx.AsyncClient(timeout=10)
yield
await app.state.http.aclose()
await app.state.engine.dispose()
Backpressure / fast-fail under load
sem = anyio.Semaphore(100)
@app.get("/work")
async def work():
if not sem.statistics().tokens_available:
raise HTTPException(503, "overloaded")
async with sem:
return await heavy()
Workers and processes
uvicorn app:app --workers 4
# Or via gunicorn:
gunicorn -k uvicorn.workers.UvicornWorker -w 4 app:app
Each worker = its own event loop + threadpool. Don’t share Python state across workers.
Python 3.13 free-threaded
uv python install 3.13.0t
PYTHONNODEBUG=1 python -X frozen_modules=off ...
Experimental. C-extensions may not yet support; check.
Read this next
If you want my async patterns reference, it’s at rajpoot.dev .
Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .