Cheatsheet for real-time. Long-form: Textbook Ch 8 .

WebSocket basics

from fastapi import WebSocket, WebSocketDisconnect

@app.websocket("/ws")
async def ws(websocket: WebSocket):
    await websocket.accept()
    try:
        while True:
            msg = await websocket.receive_json()
            await websocket.send_json({"echo": msg})
    except WebSocketDisconnect:
        pass

Send / receive variants

await websocket.receive_text(); await websocket.send_text(s)
await websocket.receive_bytes(); await websocket.send_bytes(b)
await websocket.receive_json(); await websocket.send_json(obj)

Auth patterns

Query token

@app.websocket("/ws")
async def ws(websocket: WebSocket, token: str):
    user = await verify_token(token)
    if not user:
        await websocket.close(code=4401)
        return
    await websocket.accept()
@app.websocket("/ws")
async def ws(websocket: WebSocket):
    sid = websocket.cookies.get("session")
    user = await load_user_from_session(sid)
    if not user:
        await websocket.close(code=4401)
        return
    await websocket.accept()

First-message auth

await websocket.accept()
auth = await websocket.receive_json()
user = await verify_token(auth["token"])
if not user:
    await websocket.close(code=4401); return

Connection manager (single process)

class Mgr:
    def __init__(self):
        self.active: dict[int, set[WebSocket]] = defaultdict(set)
    async def connect(self, uid, ws):
        await ws.accept(); self.active[uid].add(ws)
    def disconnect(self, uid, ws):
        self.active[uid].discard(ws)
        if not self.active[uid]: del self.active[uid]
    async def send_to(self, uid, msg):
        for ws in self.active.get(uid, []):
            try: await ws.send_json(msg)
            except Exception: pass

mgr = Mgr()

@app.websocket("/ws/{uid}")
async def ws(websocket: WebSocket, uid: int):
    await mgr.connect(uid, websocket)
    try:
        while True:
            msg = await websocket.receive_json()
            await mgr.send_to(uid, {"echo": msg})
    except WebSocketDisconnect:
        mgr.disconnect(uid, websocket)

Multi-worker fanout (Redis pub/sub)

class DistMgr:
    def __init__(self, redis):
        self.redis = redis
        self.local: dict[int, set[WebSocket]] = defaultdict(set)

    async def listen(self):
        sub = self.redis.pubsub()
        await sub.subscribe("ws_broadcast")
        async for msg in sub.listen():
            if msg["type"] != "message": continue
            p = json.loads(msg["data"])
            for ws in self.local.get(p["uid"], []):
                await ws.send_json(p["msg"])

    async def publish(self, uid, msg):
        await self.redis.publish("ws_broadcast", json.dumps({"uid": uid, "msg": msg}))

Heartbeats

async def heartbeat(websocket: WebSocket):
    while True:
        await asyncio.sleep(30)
        try: await websocket.send_text("ping")
        except Exception: return

@app.websocket("/ws")
async def ws(websocket: WebSocket):
    await websocket.accept()
    hb = asyncio.create_task(heartbeat(websocket))
    try:
        async for _ in websocket.iter_json():
            ...
    finally:
        hb.cancel()

SSE (server → client)

@app.get("/events")
async def events(request: Request):
    async def gen():
        while True:
            if await request.is_disconnected(): break
            evt = await get_next_event()
            yield f"data: {json.dumps(evt)}\n\n"
    return StreamingResponse(
        gen(),
        media_type="text/event-stream",
        headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
    )

SSE keepalive

async def gen():
    while True:
        if await request.is_disconnected(): break
        if has_event(): yield f"data: {get()}\n\n"
        else: yield ": keepalive\n\n"
        await asyncio.sleep(15)

SSE resume

async def gen():
    last_id = request.headers.get("last-event-id", "0")
    async for evt in events_after(last_id):
        yield f"id: {evt.id}\ndata: {json.dumps(evt.payload)}\n\n"

NDJSON

@app.get("/items.ndjson")
async def items(db = Depends(get_db)):
    async def gen():
        async for r in db.stream(select(Item)):
            yield json.dumps(r._mapping) + "\n"
    return StreamingResponse(gen(), media_type="application/x-ndjson")

LLM token streaming

@app.get("/chat")
async def chat(prompt: str):
    async def gen():
        async with anthropic.messages.stream(
            model="claude-sonnet-4-6",
            max_tokens=1024,
            messages=[{"role": "user", "content": prompt}],
        ) as stream:
            async for text in stream.text_stream:
                yield f"data: {json.dumps({'token': text})}\n\n"
        yield "event: done\ndata: \n\n"
    return StreamingResponse(gen(), media_type="text/event-stream")

Read this next

If you want my real-time fanout reference (FastAPI + Redis pub/sub), it’s at rajpoot.dev .


Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .