Chapter 7: FastAPI’s concurrency model. Why async def matters, how def handlers work, when each blocks the event loop, AnyIO, structured concurrency, and the patterns that scale.

The event loop

A FastAPI process runs one event loop. It runs one thing at a time at the Python level (single-threaded coroutine scheduling); concurrency is via cooperative awaits.

When you await an IO operation (DB, HTTP, file), the loop yields and runs other tasks. When you do CPU work in async code, you block the loop — every other request waits.

async def vs def handlers

@app.get("/a")
async def a():
    await db.query(...)
    return ...

@app.get("/b")
def b():
    return blocking_call()

Both are valid. They behave differently:

  • async def: runs in the event loop. Don’t do blocking IO without an async adapter.
  • def: FastAPI runs it in a threadpool (via AnyIO). The event loop is free; the request takes a thread.

Mixing is fine. Most FastAPI projects mix.

The threadpool

By default, AnyIO maintains a thread pool for def handlers and Depends callables. Default size: 40 threads.

import anyio
limiter = anyio.to_thread.current_default_thread_limiter()
limiter.total_tokens = 100  # increase if you have many sync handlers

If your def handlers all block at once, the threadpool fills, and new sync handlers wait.

For mostly-async apps: 40 threads is plenty. For mostly-sync: increase or move to async.

Common blocking traps

In async def handlers, these block the event loop:

  • time.sleep(...) — use asyncio.sleep.
  • requests.get(...) — use httpx.AsyncClient.
  • Sync DB drivers — use asyncpg / async SQLAlchemy.
  • File IO with sync open / read — use aiofiles.
  • Heavy CPU work — offload.
# BAD
@app.get("/slow")
async def slow():
    time.sleep(5)  # blocks event loop; all other requests wait

# GOOD
async def slow():
    await asyncio.sleep(5)

For unavoidable sync calls in async context:

import asyncio

result = await asyncio.to_thread(blocking_function, arg1, arg2)

to_thread puts the call on a thread; event loop free.

CPU-bound work

Threads don’t help with CPU-bound (GIL).

from concurrent.futures import ProcessPoolExecutor

executor = ProcessPoolExecutor()

@app.get("/render")
async def render(...):
    loop = asyncio.get_running_loop()
    result = await loop.run_in_executor(executor, heavy_cpu_work, arg)
    return result

Or offload to a worker queue (Celery / ARQ) and respond async.

For Python 3.13 free-threaded: threads do help, but C-extension support is partial. See Python 3.13 .

AnyIO

Starlette / FastAPI use AnyIO under the hood. AnyIO works with asyncio (and previously trio).

For TaskGroup-style concurrency:

import anyio

@app.get("/parallel")
async def parallel():
    results = {}
    async with anyio.create_task_group() as tg:
        async def fetch_a():
            results["a"] = await get_a()
        async def fetch_b():
            results["b"] = await get_b()
        tg.start_soon(fetch_a)
        tg.start_soon(fetch_b)
    return results

Or with asyncio.TaskGroup (Python 3.11+):

async with asyncio.TaskGroup() as tg:
    a_task = tg.create_task(get_a())
    b_task = tg.create_task(get_b())
return {"a": a_task.result(), "b": b_task.result()}

See Python Async Patterns .

Concurrency limits

sem = anyio.Semaphore(10)

async def fetch(url):
    async with sem:
        return await client.get(url)

Cap parallel calls. Especially important for external APIs with rate limits.

Timeouts

async with anyio.fail_after(5):
    result = await slow_op()

Or:

async with asyncio.timeout(5):
    result = await slow_op()

Always have timeouts on external calls. Without them, a slow dep takes down the loop.

Cancellation

When a request is cancelled (client disconnect, timeout):

@app.get("/")
async def home():
    try:
        await long_operation()
    except asyncio.CancelledError:
        # cleanup
        await release_resources()
        raise  # re-raise!

Don’t swallow CancelledError; cleanup and re-raise.

For Starlette: use request.is_disconnected():

async def stream(request: Request):
    async def gen():
        for i in range(1000):
            if await request.is_disconnected():
                return
            yield f"data: {i}\n\n"
            await asyncio.sleep(1)
    return StreamingResponse(gen(), media_type="text/event-stream")

Workers and processes

uvicorn main:app --workers 4

4 separate Python processes. Each has its own event loop and threadpool. Don’t share state at the Python level.

For shared state: Redis, DB, external services.

For blue/green: see Deploy Strategies .

Picking sync or async per dep

# Async (preferred where IO-bound)
async def get_db_async():
    async with AsyncSessionLocal() as session:
        yield session

# Sync (when bridging to sync code)
def get_legacy_dep():
    return LegacyService()

Mixing is fine. FastAPI handles both transparently.

Connection pooling

For DB, HTTP clients, Redis: pool at app level via lifespan; share pool across requests.

@asynccontextmanager
async def lifespan(app):
    app.state.db = create_async_engine(..., pool_size=20, max_overflow=10)
    app.state.http = httpx.AsyncClient(timeout=10)
    yield
    await app.state.db.dispose()
    await app.state.http.aclose()

Per-request: borrow a connection from the pool. Per-process: 4 workers × pool_size = total connections.

DB pool sizing

total_connections = workers × (pool_size + max_overflow)
                  ≤ db_max_connections (with safety margin)

Postgres max_connections defaults to 100. Don’t blow past it. PgBouncer in transaction-pool mode lets you size differently.

Backpressure

When the system is overloaded:

  • Threadpool full.
  • DB pool exhausted.
  • External API rate-limited.

Common pattern: explicit semaphore + 503 fast-fail rather than queueing forever:

sem = anyio.Semaphore(100)

@app.get("/")
async def home():
    if not sem.acquire_nowait():
        raise HTTPException(503, "overloaded")
    try:
        return await work()
    finally:
        sem.release()

Better than infinite queuing.

Profiling

Use py-spy to find blocked event loop:

py-spy top --pid <fastapi-pid>

Functions running on the main event loop are visible. Sync code in async context shows up as long-running.

See Python Profiling .

Async ORM patterns

async def get_user(id: int, db: AsyncSession = Depends(get_db)) -> User:
    return await db.get(User, id)

async def list_users(db: AsyncSession = Depends(get_db)) -> list[User]:
    result = await db.execute(select(User))
    return list(result.scalars())

See the SQLAlchemy textbooks for full async patterns.

Async HTTP

import httpx

async def call_external(url: str) -> dict:
    async with httpx.AsyncClient() as client:
        r = await client.get(url, timeout=10)
        r.raise_for_status()
        return r.json()

Or share a client across requests (more efficient):

# In lifespan
app.state.http = httpx.AsyncClient(timeout=10)

# In dep
async def get_http(request: Request) -> httpx.AsyncClient:
    return request.app.state.http

Streaming external API → client

@app.get("/proxy")
async def proxy(http: httpx.AsyncClient = Depends(get_http)):
    async def gen():
        async with http.stream("GET", "https://upstream/data") as r:
            async for chunk in r.aiter_bytes():
                yield chunk
    return StreamingResponse(gen())

Memory-bounded; backpressures both ways.

Common mistakes

1. Sync DB driver in async handler

psycopg2.connect(...) blocks. Use asyncpg or async SQLAlchemy.

2. requests.get in async

Same problem. httpx.AsyncClient.

3. time.sleep

asyncio.sleep for delays in async code.

4. Forgetting to close clients

Memory + connection leaks. Use lifespan + context manager.

5. CPU work in async

Blocks loop. to_thread for sync libs; process pool for heavy CPU.

What’s next

Chapter 8: WebSockets, SSE, streaming.

Read this next


Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .