FastAPI’s async + streaming combo is one of its quietly excellent features. LLM token streams, progress updates, live feeds — all flow naturally. This post is the working set.

StreamingResponse basics

from fastapi import FastAPI
from fastapi.responses import StreamingResponse

app = FastAPI()

async def gen():
    for i in range(10):
        yield f"chunk {i}\n"
        await asyncio.sleep(0.5)

@app.get("/stream")
async def stream():
    return StreamingResponse(gen(), media_type="text/plain")

Each yielded chunk flushes to the client immediately. Async generator → backpressure-aware streaming.

Server-Sent Events

@app.get("/events")
async def events():
    async def event_stream():
        for i in range(100):
            yield f"data: {json.dumps({'i': i})}\n\n"
            await asyncio.sleep(1)
    
    return StreamingResponse(
        event_stream(),
        media_type="text/event-stream",
        headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
    )

Client (browser):

const es = new EventSource("/events");
es.onmessage = (e) => console.log(JSON.parse(e.data));
es.onerror = () => console.warn("reconnecting...");

EventSource auto-reconnects. Cheap to implement; great UX.

Keepalive

async def event_stream():
    while True:
        if has_event():
            yield f"data: {json.dumps(get_event())}\n\n"
        else:
            yield ": keepalive\n\n"
        await asyncio.sleep(15)

Comment lines (: prefix) are ignored by clients but keep proxies from idling out.

LLM streaming

from anthropic import AsyncAnthropic

client = AsyncAnthropic()

@app.get("/chat/stream")
async def chat_stream(prompt: str):
    async def gen():
        async with client.messages.stream(
            model="claude-sonnet-4-6",
            max_tokens=2048,
            messages=[{"role": "user", "content": prompt}],
        ) as stream:
            async for text in stream.text_stream:
                yield f"data: {json.dumps({'token': text})}\n\n"
        yield "event: done\ndata: \n\n"
    
    return StreamingResponse(gen(), media_type="text/event-stream")

User sees tokens appear as they’re generated. Critical for perceived latency in chat UIs. See LLM Streaming Patterns .

NDJSON

For programmatic clients:

@app.get("/items.ndjson")
async def items_stream():
    async def gen():
        async for item in db.iter_items():
            yield json.dumps(item.dict()) + "\n"
    
    return StreamingResponse(gen(), media_type="application/x-ndjson")

One JSON object per line. Easy to parse incrementally on the client; no array-loading needed.

Disconnection handling

from starlette.requests import Request

@app.get("/stream")
async def stream(request: Request):
    async def gen():
        try:
            for i in range(1000):
                if await request.is_disconnected():
                    print("client gone, stopping")
                    break
                yield f"data: {i}\n\n"
                await asyncio.sleep(1)
        except asyncio.CancelledError:
            print("cancelled")
            raise
    return StreamingResponse(gen(), media_type="text/event-stream")

Client closed → stop the work. Otherwise: zombie generators run forever, holding LLM calls / DB cursors.

Auth on SSE

@app.get("/events")
async def events(request: Request, token: str = Query(...)):
    user = verify_token(token)
    if not user:
        raise HTTPException(401)
    # ...

Browser EventSource doesn’t send auth headers, so pass token as query param. Or use a same-origin session cookie.

Backpressure

If the client is slow, the generator naturally pauses (the underlying socket buffer fills, and await response.send() blocks). FastAPI’s default behavior is correct.

But: if your generator is producing fast and the client is slow, you may want a bounded queue:

from asyncio import Queue

async def producer(q):
    for item in source:
        await q.put(item)
    await q.put(None)

async def stream_endpoint():
    q = Queue(maxsize=100)
    asyncio.create_task(producer(q))
    
    async def gen():
        while True:
            item = await q.get()
            if item is None: break
            yield f"data: {item}\n\n"
    return StreamingResponse(gen(), media_type="text/event-stream")

Multi-tenant SSE

For per-user streams:

@app.get("/notifications")
async def notifications(request: Request, user: User = Depends(get_user)):
    async def gen():
        sub = redis.pubsub()
        await sub.subscribe(f"user:{user.id}")
        try:
            while True:
                if await request.is_disconnected(): break
                msg = await sub.get_message(ignore_subscribe_messages=True, timeout=15)
                if msg:
                    yield f"data: {msg['data'].decode()}\n\n"
                else:
                    yield ": keepalive\n\n"
        finally:
            await sub.unsubscribe(f"user:{user.id}")
            await sub.close()
    return StreamingResponse(gen(), media_type="text/event-stream")

Pub/sub via Redis; clean up on disconnect.

Production gotchas

  • X-Accel-Buffering: no: nginx buffers responses by default; this header disables for SSE.
  • Cloudflare: free tier may close idle long connections; Pro+ supports SSE properly.
  • uvicorn --workers 4: each worker has its own memory; pub/sub via Redis to fan out.
  • HTTP/2: SSE works on HTTP/1.1 and HTTP/2; use HTTP/2 to multiplex many SSE connections.

SSE vs WebSocket

SSEWebSocket
DirectionServer → ClientBoth
ReconnectBuilt-inCustom
ProtocolHTTPWS upgrade
AuthSame as HTTPCustom
Browser supportUniversalUniversal

For LLM token streams, notifications, progress: SSE. For chat, multiplayer: WebSocket. See Django Channels for WebSocket comparison.

Common mistakes

1. Synchronous generators

def gen(): instead of async def gen():. Blocks the event loop. Always async.

2. No keepalive

Connection idles out at 60s; client reconnects every minute. Send heartbeats.

3. Forgetting to close resources

Generator stops early; underlying DB cursor / pub/sub still open. Use try/finally to clean up.

4. No client-side reconnect logic for fetch-streaming

EventSource auto-reconnects; raw fetch + reader doesn’t. Build it.

5. Big initial payloads

Streaming endpoint returns 100KB before first chunk → user waits. Stream from the first byte.

Read this next

If you want my FastAPI streaming starter (SSE + auth + reconnect), it’s at rajpoot.dev .


Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .