Chapter 8: real-time and streaming. WebSockets, SSE, NDJSON, file streaming, broadcast patterns, and how to scale across multiple workers.
WebSocket basics
from fastapi import WebSocket, WebSocketDisconnect
@app.websocket("/ws")
async def ws_endpoint(websocket: WebSocket):
await websocket.accept()
try:
while True:
msg = await websocket.receive_json()
await websocket.send_json({"echo": msg})
except WebSocketDisconnect:
pass
accept upgrades the HTTP connection. Then bidirectional send_* / receive_*. Catch WebSocketDisconnect for cleanup.
Send / receive variants
receive_text()/send_text(str).receive_bytes()/send_bytes(bytes).receive_json()/send_json(obj).
Pick the type that matches your client.
Auth on WebSockets
Browsers can’t send custom headers in new WebSocket(url). Common patterns:
Query token
@app.websocket("/ws")
async def ws(websocket: WebSocket, token: str):
user = await verify_token(token)
if not user:
await websocket.close(code=4401)
return
await websocket.accept()
...
Cookie
If same-origin, cookies are sent on the upgrade request:
@app.websocket("/ws")
async def ws(websocket: WebSocket):
session = websocket.cookies.get("session")
user = await load_user_from_session(session)
if not user:
await websocket.close(code=4401)
return
await websocket.accept()
First message auth
@app.websocket("/ws")
async def ws(websocket: WebSocket):
await websocket.accept()
auth = await websocket.receive_json()
user = await verify_token(auth["token"])
if not user:
await websocket.close(code=4401)
return
...
Connection manager (single process)
class ConnectionManager:
def __init__(self):
self.active: dict[int, set[WebSocket]] = defaultdict(set)
async def connect(self, user_id: int, ws: WebSocket):
await ws.accept()
self.active[user_id].add(ws)
def disconnect(self, user_id: int, ws: WebSocket):
self.active[user_id].discard(ws)
if not self.active[user_id]:
del self.active[user_id]
async def send_to(self, user_id: int, msg: dict):
for ws in self.active.get(user_id, []):
try:
await ws.send_json(msg)
except Exception:
pass
manager = ConnectionManager()
@app.websocket("/ws/{user_id}")
async def ws(websocket: WebSocket, user_id: int):
await manager.connect(user_id, websocket)
try:
while True:
msg = await websocket.receive_json()
await manager.send_to(user_id, {"echo": msg})
except WebSocketDisconnect:
manager.disconnect(user_id, websocket)
Works for single-process apps. For multi-worker: use Redis pub/sub.
Multi-worker via Redis
class DistributedManager:
def __init__(self, redis_client):
self.redis = redis_client
self.local: dict[int, set[WebSocket]] = defaultdict(set)
async def listen(self):
pubsub = self.redis.pubsub()
await pubsub.subscribe("ws_broadcast")
async for msg in pubsub.listen():
if msg["type"] == "message":
payload = json.loads(msg["data"])
user_id = payload["user_id"]
for ws in self.local.get(user_id, []):
await ws.send_json(payload["message"])
async def publish(self, user_id: int, message: dict):
await self.redis.publish("ws_broadcast", json.dumps({"user_id": user_id, "message": message}))
Each worker subscribes; publishes go to all workers; each delivers to its local connections.
For high-volume: dedicated pub/sub service (NATS, Redis Streams).
See Django Channels for a similar pattern in Django.
Heartbeats
LB / proxy idle timeouts kill quiet connections:
async def heartbeat(websocket: WebSocket):
while True:
await asyncio.sleep(30)
try:
await websocket.send_text("ping")
except Exception:
return
@app.websocket("/ws")
async def ws(websocket: WebSocket):
await websocket.accept()
hb_task = asyncio.create_task(heartbeat(websocket))
try:
while True:
msg = await websocket.receive_text()
...
finally:
hb_task.cancel()
Or rely on client to ping.
Server-Sent Events (SSE)
Easier than WebSocket for one-way streams:
from fastapi.responses import StreamingResponse
@app.get("/events")
async def events(request: Request):
async def gen():
while True:
if await request.is_disconnected():
break
event = await get_next_event()
yield f"data: {json.dumps(event)}\n\n"
return StreamingResponse(
gen(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)
Client:
const es = new EventSource("/events");
es.onmessage = (e) => console.log(JSON.parse(e.data));
Auto-reconnect; HTTP-friendly. See FastAPI Streaming .
Keepalive on SSE
async def gen():
while True:
if await request.is_disconnected(): break
if has_event():
yield f"data: {json.dumps(get_event())}\n\n"
else:
yield ": keepalive\n\n"
await asyncio.sleep(15)
Comment lines (: prefix) are ignored by clients but keep proxies open.
SSE reconnect / resume
async def gen():
last_id = request.headers.get("last-event-id", "0")
async for event in events_after(last_id):
yield f"id: {event.id}\ndata: {json.dumps(event.payload)}\n\n"
EventSource sends Last-Event-ID on reconnect. Server resumes.
NDJSON
For programmatic clients streaming JSON records:
@app.get("/stream/items")
async def stream_items(db: AsyncSession = Depends(get_db)):
async def gen():
async for item in db.stream(select(Item)):
yield json.dumps(item._asdict()) + "\n"
return StreamingResponse(gen(), media_type="application/x-ndjson")
Each line is a JSON object. Clients parse incrementally.
File streaming
from fastapi.responses import StreamingResponse
@app.get("/download/{name}")
async def download(name: str):
def gen():
with open(f"/files/{name}", "rb") as f:
while chunk := f.read(8192):
yield chunk
return StreamingResponse(gen(), media_type="application/octet-stream", headers={"Content-Disposition": f"attachment; filename={name}"})
For local files: prefer FileResponse (uses sendfile when possible).
For remote / generated: stream in chunks.
LLM token streaming
@app.get("/chat")
async def chat(prompt: str):
async def gen():
async with anthropic.messages.stream(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}],
) as stream:
async for text in stream.text_stream:
yield f"data: {json.dumps({'token': text})}\n\n"
yield "event: done\ndata: \n\n"
return StreamingResponse(gen(), media_type="text/event-stream")
User sees tokens as they arrive. See Anthropic API Best Practices .
Backpressure
The framework handles backpressure for SSE / streaming:
- Generator yields chunk → ASGI sends → socket buffer → if full, send blocks → generator pauses.
Don’t buffer in your own task ahead of yield; that breaks the chain.
Disconnect handling for long ops
Cancellation propagates if you await regularly:
async def stream(request: Request):
async def gen():
for i in range(1000):
if await request.is_disconnected():
# cleanup; release LLM call / DB cursor
break
yield f"data: {i}\n\n"
await asyncio.sleep(1)
return StreamingResponse(gen(), media_type="text/event-stream")
Periodically check is_disconnected. Otherwise zombies.
Production gotchas
X-Accel-Buffering: nofor nginx — disables proxy buffering.- Cloudflare free may close idle connections; Pro+ supports SSE / WS.
- AWS ALB / GCP LB support both with timeout config.
- HTTP/2 multiplexes; HTTP/1 needs separate connections per stream.
When to pick which
- SSE: server → client only (notifications, LLM, progress).
- WebSocket: bidi (chat, collaborative).
- NDJSON: programmatic streams (CLIs, SDKs).
- StreamingResponse: any chunked HTTP response.
See Streaming Patterns 2026 .
Common mistakes
1. No auth on WS
Anyone can connect. Always verify before accept.
2. Synchronous generator in StreamingResponse
Blocks the event loop. Use async def generators.
3. No keepalive
Connection dies at 60s; reconnect storm.
4. Storing state in connection-bound objects
Lost on reconnect. Store in DB / Redis.
5. Forgetting CORS for WS
Browsers enforce. Allow origin in WS handshake.
What’s next
Chapter 9: Background tasks and integration with task queues.
Read this next
Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .