FastAPI’s async + streaming combo is one of its quietly excellent features. LLM token streams, progress updates, live feeds — all flow naturally. This post is the working set.
StreamingResponse basics
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
app = FastAPI()
async def gen():
for i in range(10):
yield f"chunk {i}\n"
await asyncio.sleep(0.5)
@app.get("/stream")
async def stream():
return StreamingResponse(gen(), media_type="text/plain")
Each yielded chunk flushes to the client immediately. Async generator → backpressure-aware streaming.
Server-Sent Events
@app.get("/events")
async def events():
async def event_stream():
for i in range(100):
yield f"data: {json.dumps({'i': i})}\n\n"
await asyncio.sleep(1)
return StreamingResponse(
event_stream(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)
Client (browser):
const es = new EventSource("/events");
es.onmessage = (e) => console.log(JSON.parse(e.data));
es.onerror = () => console.warn("reconnecting...");
EventSource auto-reconnects. Cheap to implement; great UX.
Keepalive
async def event_stream():
while True:
if has_event():
yield f"data: {json.dumps(get_event())}\n\n"
else:
yield ": keepalive\n\n"
await asyncio.sleep(15)
Comment lines (: prefix) are ignored by clients but keep proxies from idling out.
LLM streaming
from anthropic import AsyncAnthropic
client = AsyncAnthropic()
@app.get("/chat/stream")
async def chat_stream(prompt: str):
async def gen():
async with client.messages.stream(
model="claude-sonnet-4-6",
max_tokens=2048,
messages=[{"role": "user", "content": prompt}],
) as stream:
async for text in stream.text_stream:
yield f"data: {json.dumps({'token': text})}\n\n"
yield "event: done\ndata: \n\n"
return StreamingResponse(gen(), media_type="text/event-stream")
User sees tokens appear as they’re generated. Critical for perceived latency in chat UIs. See LLM Streaming Patterns .
NDJSON
For programmatic clients:
@app.get("/items.ndjson")
async def items_stream():
async def gen():
async for item in db.iter_items():
yield json.dumps(item.dict()) + "\n"
return StreamingResponse(gen(), media_type="application/x-ndjson")
One JSON object per line. Easy to parse incrementally on the client; no array-loading needed.
Disconnection handling
from starlette.requests import Request
@app.get("/stream")
async def stream(request: Request):
async def gen():
try:
for i in range(1000):
if await request.is_disconnected():
print("client gone, stopping")
break
yield f"data: {i}\n\n"
await asyncio.sleep(1)
except asyncio.CancelledError:
print("cancelled")
raise
return StreamingResponse(gen(), media_type="text/event-stream")
Client closed → stop the work. Otherwise: zombie generators run forever, holding LLM calls / DB cursors.
Auth on SSE
@app.get("/events")
async def events(request: Request, token: str = Query(...)):
user = verify_token(token)
if not user:
raise HTTPException(401)
# ...
Browser EventSource doesn’t send auth headers, so pass token as query param. Or use a same-origin session cookie.
Backpressure
If the client is slow, the generator naturally pauses (the underlying socket buffer fills, and await response.send() blocks). FastAPI’s default behavior is correct.
But: if your generator is producing fast and the client is slow, you may want a bounded queue:
from asyncio import Queue
async def producer(q):
for item in source:
await q.put(item)
await q.put(None)
async def stream_endpoint():
q = Queue(maxsize=100)
asyncio.create_task(producer(q))
async def gen():
while True:
item = await q.get()
if item is None: break
yield f"data: {item}\n\n"
return StreamingResponse(gen(), media_type="text/event-stream")
Multi-tenant SSE
For per-user streams:
@app.get("/notifications")
async def notifications(request: Request, user: User = Depends(get_user)):
async def gen():
sub = redis.pubsub()
await sub.subscribe(f"user:{user.id}")
try:
while True:
if await request.is_disconnected(): break
msg = await sub.get_message(ignore_subscribe_messages=True, timeout=15)
if msg:
yield f"data: {msg['data'].decode()}\n\n"
else:
yield ": keepalive\n\n"
finally:
await sub.unsubscribe(f"user:{user.id}")
await sub.close()
return StreamingResponse(gen(), media_type="text/event-stream")
Pub/sub via Redis; clean up on disconnect.
Production gotchas
X-Accel-Buffering: no: nginx buffers responses by default; this header disables for SSE.- Cloudflare: free tier may close idle long connections; Pro+ supports SSE properly.
uvicorn --workers 4: each worker has its own memory; pub/sub via Redis to fan out.- HTTP/2: SSE works on HTTP/1.1 and HTTP/2; use HTTP/2 to multiplex many SSE connections.
SSE vs WebSocket
| SSE | WebSocket | |
|---|---|---|
| Direction | Server → Client | Both |
| Reconnect | Built-in | Custom |
| Protocol | HTTP | WS upgrade |
| Auth | Same as HTTP | Custom |
| Browser support | Universal | Universal |
For LLM token streams, notifications, progress: SSE. For chat, multiplayer: WebSocket. See Django Channels for WebSocket comparison.
Common mistakes
1. Synchronous generators
def gen(): instead of async def gen():. Blocks the event loop. Always async.
2. No keepalive
Connection idles out at 60s; client reconnects every minute. Send heartbeats.
3. Forgetting to close resources
Generator stops early; underlying DB cursor / pub/sub still open. Use try/finally to clean up.
4. No client-side reconnect logic for fetch-streaming
EventSource auto-reconnects; raw fetch + reader doesn’t. Build it.
5. Big initial payloads
Streaming endpoint returns 100KB before first chunk → user waits. Stream from the first byte.
Read this next
- FastAPI + Pydantic v2 + SQLAlchemy 2.0
- LLM Streaming Patterns 2026
- Django Channels & WebSockets 2026
- Scaling WebSockets in 2026
If you want my FastAPI streaming starter (SSE + auth + reconnect), it’s at rajpoot.dev .
Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .