Cheatsheet for async + concurrency in FastAPI. Long-form: Textbook Ch 7 and Python Async Patterns 2026 .

async def vs def handlers

@app.get("/a")
async def a():
    await db.query(...)        # event loop; never block

@app.get("/b")
def b():
    return blocking_call()     # runs in threadpool (AnyIO)

Mix freely. For IO-bound + async libs: async def. For sync-only legacy code: def is fine in moderation.

Increase threadpool

import anyio
limiter = anyio.to_thread.current_default_thread_limiter()
limiter.total_tokens = 100        # default 40

If many def handlers block at once: bump.

Blocking traps in async handlers

Don’tDo
time.sleep(5)await asyncio.sleep(5)
requests.get(...)httpx.AsyncClient
psycopg2 syncasyncpg / async SQLAlchemy
open() for big filesaiofiles
Heavy CPUasyncio.to_thread / process pool

Offload sync to a thread

result = await asyncio.to_thread(legacy_sync, arg1, arg2)

For CPU-bound: ProcessPoolExecutor:

loop = asyncio.get_running_loop()
result = await loop.run_in_executor(executor, heavy_cpu_fn, arg)

TaskGroup (Python 3.11+)

import asyncio

@app.get("/parallel")
async def parallel():
    async with asyncio.TaskGroup() as tg:
        a = tg.create_task(fetch_a())
        b = tg.create_task(fetch_b())
    return {"a": a.result(), "b": b.result()}

Or AnyIO (works with both asyncio + trio):

import anyio

@app.get("/p")
async def p():
    out = {}
    async with anyio.create_task_group() as tg:
        async def fa(): out["a"] = await fetch_a()
        async def fb(): out["b"] = await fetch_b()
        tg.start_soon(fa)
        tg.start_soon(fb)
    return out

Concurrency limit (semaphore)

sem = anyio.Semaphore(10)

async def fetch(url):
    async with sem:
        return await client.get(url)

Timeouts

async with asyncio.timeout(5):
    result = await slow_op()

# or
async with anyio.fail_after(5):
    result = await slow_op()

Always have timeouts on external calls.

Cancellation

@app.get("/")
async def home():
    try:
        await long_op()
    except asyncio.CancelledError:
        await cleanup()
        raise                       # re-raise!

Don’t swallow CancelledError.

Disconnection check

async def stream(request: Request):
    async def gen():
        for i in range(10000):
            if await request.is_disconnected(): return
            yield f"data: {i}\n\n"
            await asyncio.sleep(1)
    return StreamingResponse(gen(), media_type="text/event-stream")

Connection pools (lifespan-scoped)

@asynccontextmanager
async def lifespan(app):
    app.state.engine = create_async_engine(URL, pool_size=20, max_overflow=10)
    app.state.http = httpx.AsyncClient(timeout=10)
    yield
    await app.state.http.aclose()
    await app.state.engine.dispose()

Backpressure / fast-fail under load

sem = anyio.Semaphore(100)

@app.get("/work")
async def work():
    if not sem.statistics().tokens_available:
        raise HTTPException(503, "overloaded")
    async with sem:
        return await heavy()

Workers and processes

uvicorn app:app --workers 4
# Or via gunicorn:
gunicorn -k uvicorn.workers.UvicornWorker -w 4 app:app

Each worker = its own event loop + threadpool. Don’t share Python state across workers.

Python 3.13 free-threaded

uv python install 3.13.0t
PYTHONNODEBUG=1 python -X frozen_modules=off ...

Experimental. C-extensions may not yet support; check.

Read this next

If you want my async patterns reference, it’s at rajpoot.dev .


Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .