Chapter 8: real-time and streaming. WebSockets, SSE, NDJSON, file streaming, broadcast patterns, and how to scale across multiple workers.

WebSocket basics

from fastapi import WebSocket, WebSocketDisconnect

@app.websocket("/ws")
async def ws_endpoint(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            msg = await websocket.receive_json()
            await websocket.send_json({"echo": msg})
    except WebSocketDisconnect:
        pass

accept upgrades the HTTP connection. Then bidirectional send_* / receive_*. Catch WebSocketDisconnect for cleanup.

Send / receive variants

  • receive_text() / send_text(str).
  • receive_bytes() / send_bytes(bytes).
  • receive_json() / send_json(obj).

Pick the type that matches your client.

Auth on WebSockets

Browsers can’t send custom headers in new WebSocket(url). Common patterns:

Query token

@app.websocket("/ws")
async def ws(websocket: WebSocket, token: str):
    user = await verify_token(token)
    if not user:
        await websocket.close(code=4401)
        return
    await websocket.accept()
    ...

If same-origin, cookies are sent on the upgrade request:

@app.websocket("/ws")
async def ws(websocket: WebSocket):
    session = websocket.cookies.get("session")
    user = await load_user_from_session(session)
    if not user:
        await websocket.close(code=4401)
        return
    await websocket.accept()

First message auth

@app.websocket("/ws")
async def ws(websocket: WebSocket):
    await websocket.accept()
    auth = await websocket.receive_json()
    user = await verify_token(auth["token"])
    if not user:
        await websocket.close(code=4401)
        return
    ...

Connection manager (single process)

class ConnectionManager:
    def __init__(self):
        self.active: dict[int, set[WebSocket]] = defaultdict(set)
    
    async def connect(self, user_id: int, ws: WebSocket):
        await ws.accept()
        self.active[user_id].add(ws)
    
    def disconnect(self, user_id: int, ws: WebSocket):
        self.active[user_id].discard(ws)
        if not self.active[user_id]:
            del self.active[user_id]
    
    async def send_to(self, user_id: int, msg: dict):
        for ws in self.active.get(user_id, []):
            try:
                await ws.send_json(msg)
            except Exception:
                pass

manager = ConnectionManager()

@app.websocket("/ws/{user_id}")
async def ws(websocket: WebSocket, user_id: int):
    await manager.connect(user_id, websocket)
    try:
        while True:
            msg = await websocket.receive_json()
            await manager.send_to(user_id, {"echo": msg})
    except WebSocketDisconnect:
        manager.disconnect(user_id, websocket)

Works for single-process apps. For multi-worker: use Redis pub/sub.

Multi-worker via Redis

class DistributedManager:
    def __init__(self, redis_client):
        self.redis = redis_client
        self.local: dict[int, set[WebSocket]] = defaultdict(set)
    
    async def listen(self):
        pubsub = self.redis.pubsub()
        await pubsub.subscribe("ws_broadcast")
        async for msg in pubsub.listen():
            if msg["type"] == "message":
                payload = json.loads(msg["data"])
                user_id = payload["user_id"]
                for ws in self.local.get(user_id, []):
                    await ws.send_json(payload["message"])
    
    async def publish(self, user_id: int, message: dict):
        await self.redis.publish("ws_broadcast", json.dumps({"user_id": user_id, "message": message}))

Each worker subscribes; publishes go to all workers; each delivers to its local connections.

For high-volume: dedicated pub/sub service (NATS, Redis Streams).

See Django Channels for a similar pattern in Django.

Heartbeats

LB / proxy idle timeouts kill quiet connections:

async def heartbeat(websocket: WebSocket):
    while True:
        await asyncio.sleep(30)
        try:
            await websocket.send_text("ping")
        except Exception:
            return

@app.websocket("/ws")
async def ws(websocket: WebSocket):
    await websocket.accept()
    hb_task = asyncio.create_task(heartbeat(websocket))
    try:
        while True:
            msg = await websocket.receive_text()
            ...
    finally:
        hb_task.cancel()

Or rely on client to ping.

Server-Sent Events (SSE)

Easier than WebSocket for one-way streams:

from fastapi.responses import StreamingResponse

@app.get("/events")
async def events(request: Request):
    async def gen():
        while True:
            if await request.is_disconnected():
                break
            event = await get_next_event()
            yield f"data: {json.dumps(event)}\n\n"
    return StreamingResponse(
        gen(),
        media_type="text/event-stream",
        headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
    )

Client:

const es = new EventSource("/events");
es.onmessage = (e) => console.log(JSON.parse(e.data));

Auto-reconnect; HTTP-friendly. See FastAPI Streaming .

Keepalive on SSE

async def gen():
    while True:
        if await request.is_disconnected(): break
        if has_event():
            yield f"data: {json.dumps(get_event())}\n\n"
        else:
            yield ": keepalive\n\n"
        await asyncio.sleep(15)

Comment lines (: prefix) are ignored by clients but keep proxies open.

SSE reconnect / resume

async def gen():
    last_id = request.headers.get("last-event-id", "0")
    async for event in events_after(last_id):
        yield f"id: {event.id}\ndata: {json.dumps(event.payload)}\n\n"

EventSource sends Last-Event-ID on reconnect. Server resumes.

NDJSON

For programmatic clients streaming JSON records:

@app.get("/stream/items")
async def stream_items(db: AsyncSession = Depends(get_db)):
    async def gen():
        async for item in db.stream(select(Item)):
            yield json.dumps(item._asdict()) + "\n"
    return StreamingResponse(gen(), media_type="application/x-ndjson")

Each line is a JSON object. Clients parse incrementally.

File streaming

from fastapi.responses import StreamingResponse

@app.get("/download/{name}")
async def download(name: str):
    def gen():
        with open(f"/files/{name}", "rb") as f:
            while chunk := f.read(8192):
                yield chunk
    return StreamingResponse(gen(), media_type="application/octet-stream", headers={"Content-Disposition": f"attachment; filename={name}"})

For local files: prefer FileResponse (uses sendfile when possible).

For remote / generated: stream in chunks.

LLM token streaming

@app.get("/chat")
async def chat(prompt: str):
    async def gen():
        async with anthropic.messages.stream(
            model="claude-sonnet-4-6",
            max_tokens=1024,
            messages=[{"role": "user", "content": prompt}],
        ) as stream:
            async for text in stream.text_stream:
                yield f"data: {json.dumps({'token': text})}\n\n"
        yield "event: done\ndata: \n\n"
    return StreamingResponse(gen(), media_type="text/event-stream")

User sees tokens as they arrive. See Anthropic API Best Practices .

Backpressure

The framework handles backpressure for SSE / streaming:

  • Generator yields chunk → ASGI sends → socket buffer → if full, send blocks → generator pauses.

Don’t buffer in your own task ahead of yield; that breaks the chain.

Disconnect handling for long ops

Cancellation propagates if you await regularly:

async def stream(request: Request):
    async def gen():
        for i in range(1000):
            if await request.is_disconnected():
                # cleanup; release LLM call / DB cursor
                break
            yield f"data: {i}\n\n"
            await asyncio.sleep(1)
    return StreamingResponse(gen(), media_type="text/event-stream")

Periodically check is_disconnected. Otherwise zombies.

Production gotchas

  • X-Accel-Buffering: no for nginx — disables proxy buffering.
  • Cloudflare free may close idle connections; Pro+ supports SSE / WS.
  • AWS ALB / GCP LB support both with timeout config.
  • HTTP/2 multiplexes; HTTP/1 needs separate connections per stream.

When to pick which

  • SSE: server → client only (notifications, LLM, progress).
  • WebSocket: bidi (chat, collaborative).
  • NDJSON: programmatic streams (CLIs, SDKs).
  • StreamingResponse: any chunked HTTP response.

See Streaming Patterns 2026 .

Common mistakes

1. No auth on WS

Anyone can connect. Always verify before accept.

2. Synchronous generator in StreamingResponse

Blocks the event loop. Use async def generators.

3. No keepalive

Connection dies at 60s; reconnect storm.

4. Storing state in connection-bound objects

Lost on reconnect. Store in DB / Redis.

5. Forgetting CORS for WS

Browsers enforce. Allow origin in WS handshake.

What’s next

Chapter 9: Background tasks and integration with task queues.

Read this next


Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .