Chapter 7: FastAPI’s concurrency model. Why async def matters, how def handlers work, when each blocks the event loop, AnyIO, structured concurrency, and the patterns that scale.
The event loop
A FastAPI process runs one event loop. It runs one thing at a time at the Python level (single-threaded coroutine scheduling); concurrency is via cooperative awaits.
When you await an IO operation (DB, HTTP, file), the loop yields and runs other tasks. When you do CPU work in async code, you block the loop — every other request waits.
async def vs def handlers
@app.get("/a")
async def a():
await db.query(...)
return ...
@app.get("/b")
def b():
return blocking_call()
Both are valid. They behave differently:
async def: runs in the event loop. Don’t do blocking IO without anasyncadapter.def: FastAPI runs it in a threadpool (via AnyIO). The event loop is free; the request takes a thread.
Mixing is fine. Most FastAPI projects mix.
The threadpool
By default, AnyIO maintains a thread pool for def handlers and Depends callables. Default size: 40 threads.
import anyio
limiter = anyio.to_thread.current_default_thread_limiter()
limiter.total_tokens = 100 # increase if you have many sync handlers
If your def handlers all block at once, the threadpool fills, and new sync handlers wait.
For mostly-async apps: 40 threads is plenty. For mostly-sync: increase or move to async.
Common blocking traps
In async def handlers, these block the event loop:
time.sleep(...)— useasyncio.sleep.requests.get(...)— usehttpx.AsyncClient.- Sync DB drivers — use asyncpg / async SQLAlchemy.
- File IO with sync
open/read— useaiofiles. - Heavy CPU work — offload.
# BAD
@app.get("/slow")
async def slow():
time.sleep(5) # blocks event loop; all other requests wait
# GOOD
async def slow():
await asyncio.sleep(5)
For unavoidable sync calls in async context:
import asyncio
result = await asyncio.to_thread(blocking_function, arg1, arg2)
to_thread puts the call on a thread; event loop free.
CPU-bound work
Threads don’t help with CPU-bound (GIL).
from concurrent.futures import ProcessPoolExecutor
executor = ProcessPoolExecutor()
@app.get("/render")
async def render(...):
loop = asyncio.get_running_loop()
result = await loop.run_in_executor(executor, heavy_cpu_work, arg)
return result
Or offload to a worker queue (Celery / ARQ) and respond async.
For Python 3.13 free-threaded: threads do help, but C-extension support is partial. See Python 3.13 .
AnyIO
Starlette / FastAPI use AnyIO under the hood. AnyIO works with asyncio (and previously trio).
For TaskGroup-style concurrency:
import anyio
@app.get("/parallel")
async def parallel():
results = {}
async with anyio.create_task_group() as tg:
async def fetch_a():
results["a"] = await get_a()
async def fetch_b():
results["b"] = await get_b()
tg.start_soon(fetch_a)
tg.start_soon(fetch_b)
return results
Or with asyncio.TaskGroup (Python 3.11+):
async with asyncio.TaskGroup() as tg:
a_task = tg.create_task(get_a())
b_task = tg.create_task(get_b())
return {"a": a_task.result(), "b": b_task.result()}
See Python Async Patterns .
Concurrency limits
sem = anyio.Semaphore(10)
async def fetch(url):
async with sem:
return await client.get(url)
Cap parallel calls. Especially important for external APIs with rate limits.
Timeouts
async with anyio.fail_after(5):
result = await slow_op()
Or:
async with asyncio.timeout(5):
result = await slow_op()
Always have timeouts on external calls. Without them, a slow dep takes down the loop.
Cancellation
When a request is cancelled (client disconnect, timeout):
@app.get("/")
async def home():
try:
await long_operation()
except asyncio.CancelledError:
# cleanup
await release_resources()
raise # re-raise!
Don’t swallow CancelledError; cleanup and re-raise.
For Starlette: use request.is_disconnected():
async def stream(request: Request):
async def gen():
for i in range(1000):
if await request.is_disconnected():
return
yield f"data: {i}\n\n"
await asyncio.sleep(1)
return StreamingResponse(gen(), media_type="text/event-stream")
Workers and processes
uvicorn main:app --workers 4
4 separate Python processes. Each has its own event loop and threadpool. Don’t share state at the Python level.
For shared state: Redis, DB, external services.
For blue/green: see Deploy Strategies .
Picking sync or async per dep
# Async (preferred where IO-bound)
async def get_db_async():
async with AsyncSessionLocal() as session:
yield session
# Sync (when bridging to sync code)
def get_legacy_dep():
return LegacyService()
Mixing is fine. FastAPI handles both transparently.
Connection pooling
For DB, HTTP clients, Redis: pool at app level via lifespan; share pool across requests.
@asynccontextmanager
async def lifespan(app):
app.state.db = create_async_engine(..., pool_size=20, max_overflow=10)
app.state.http = httpx.AsyncClient(timeout=10)
yield
await app.state.db.dispose()
await app.state.http.aclose()
Per-request: borrow a connection from the pool. Per-process: 4 workers × pool_size = total connections.
DB pool sizing
total_connections = workers × (pool_size + max_overflow)
≤ db_max_connections (with safety margin)
Postgres max_connections defaults to 100. Don’t blow past it. PgBouncer in transaction-pool mode lets you size differently.
Backpressure
When the system is overloaded:
- Threadpool full.
- DB pool exhausted.
- External API rate-limited.
Common pattern: explicit semaphore + 503 fast-fail rather than queueing forever:
sem = anyio.Semaphore(100)
@app.get("/")
async def home():
if not sem.acquire_nowait():
raise HTTPException(503, "overloaded")
try:
return await work()
finally:
sem.release()
Better than infinite queuing.
Profiling
Use py-spy to find blocked event loop:
py-spy top --pid <fastapi-pid>
Functions running on the main event loop are visible. Sync code in async context shows up as long-running.
See Python Profiling .
Async ORM patterns
async def get_user(id: int, db: AsyncSession = Depends(get_db)) -> User:
return await db.get(User, id)
async def list_users(db: AsyncSession = Depends(get_db)) -> list[User]:
result = await db.execute(select(User))
return list(result.scalars())
See the SQLAlchemy textbooks for full async patterns.
Async HTTP
import httpx
async def call_external(url: str) -> dict:
async with httpx.AsyncClient() as client:
r = await client.get(url, timeout=10)
r.raise_for_status()
return r.json()
Or share a client across requests (more efficient):
# In lifespan
app.state.http = httpx.AsyncClient(timeout=10)
# In dep
async def get_http(request: Request) -> httpx.AsyncClient:
return request.app.state.http
Streaming external API → client
@app.get("/proxy")
async def proxy(http: httpx.AsyncClient = Depends(get_http)):
async def gen():
async with http.stream("GET", "https://upstream/data") as r:
async for chunk in r.aiter_bytes():
yield chunk
return StreamingResponse(gen())
Memory-bounded; backpressures both ways.
Common mistakes
1. Sync DB driver in async handler
psycopg2.connect(...) blocks. Use asyncpg or async SQLAlchemy.
2. requests.get in async
Same problem. httpx.AsyncClient.
3. time.sleep
asyncio.sleep for delays in async code.
4. Forgetting to close clients
Memory + connection leaks. Use lifespan + context manager.
5. CPU work in async
Blocks loop. to_thread for sync libs; process pool for heavy CPU.
What’s next
Chapter 8: WebSockets, SSE, streaming.
Read this next
Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .