Cheatsheet for real-time. Long-form: Textbook Ch 8 .
WebSocket basics
from fastapi import WebSocket, WebSocketDisconnect
@app.websocket("/ws")
async def ws(websocket: WebSocket):
await websocket.accept()
try:
while True:
msg = await websocket.receive_json()
await websocket.send_json({"echo": msg})
except WebSocketDisconnect:
pass
Send / receive variants
await websocket.receive_text(); await websocket.send_text(s)
await websocket.receive_bytes(); await websocket.send_bytes(b)
await websocket.receive_json(); await websocket.send_json(obj)
Auth patterns
Query token
@app.websocket("/ws")
async def ws(websocket: WebSocket, token: str):
user = await verify_token(token)
if not user:
await websocket.close(code=4401)
return
await websocket.accept()
Cookie
@app.websocket("/ws")
async def ws(websocket: WebSocket):
sid = websocket.cookies.get("session")
user = await load_user_from_session(sid)
if not user:
await websocket.close(code=4401)
return
await websocket.accept()
First-message auth
await websocket.accept()
auth = await websocket.receive_json()
user = await verify_token(auth["token"])
if not user:
await websocket.close(code=4401); return
Connection manager (single process)
class Mgr:
def __init__(self):
self.active: dict[int, set[WebSocket]] = defaultdict(set)
async def connect(self, uid, ws):
await ws.accept(); self.active[uid].add(ws)
def disconnect(self, uid, ws):
self.active[uid].discard(ws)
if not self.active[uid]: del self.active[uid]
async def send_to(self, uid, msg):
for ws in self.active.get(uid, []):
try: await ws.send_json(msg)
except Exception: pass
mgr = Mgr()
@app.websocket("/ws/{uid}")
async def ws(websocket: WebSocket, uid: int):
await mgr.connect(uid, websocket)
try:
while True:
msg = await websocket.receive_json()
await mgr.send_to(uid, {"echo": msg})
except WebSocketDisconnect:
mgr.disconnect(uid, websocket)
Multi-worker fanout (Redis pub/sub)
class DistMgr:
def __init__(self, redis):
self.redis = redis
self.local: dict[int, set[WebSocket]] = defaultdict(set)
async def listen(self):
sub = self.redis.pubsub()
await sub.subscribe("ws_broadcast")
async for msg in sub.listen():
if msg["type"] != "message": continue
p = json.loads(msg["data"])
for ws in self.local.get(p["uid"], []):
await ws.send_json(p["msg"])
async def publish(self, uid, msg):
await self.redis.publish("ws_broadcast", json.dumps({"uid": uid, "msg": msg}))
Heartbeats
async def heartbeat(websocket: WebSocket):
while True:
await asyncio.sleep(30)
try: await websocket.send_text("ping")
except Exception: return
@app.websocket("/ws")
async def ws(websocket: WebSocket):
await websocket.accept()
hb = asyncio.create_task(heartbeat(websocket))
try:
async for _ in websocket.iter_json():
...
finally:
hb.cancel()
SSE (server → client)
@app.get("/events")
async def events(request: Request):
async def gen():
while True:
if await request.is_disconnected(): break
evt = await get_next_event()
yield f"data: {json.dumps(evt)}\n\n"
return StreamingResponse(
gen(),
media_type="text/event-stream",
headers={"Cache-Control": "no-cache", "X-Accel-Buffering": "no"},
)
SSE keepalive
async def gen():
while True:
if await request.is_disconnected(): break
if has_event(): yield f"data: {get()}\n\n"
else: yield ": keepalive\n\n"
await asyncio.sleep(15)
SSE resume
async def gen():
last_id = request.headers.get("last-event-id", "0")
async for evt in events_after(last_id):
yield f"id: {evt.id}\ndata: {json.dumps(evt.payload)}\n\n"
NDJSON
@app.get("/items.ndjson")
async def items(db = Depends(get_db)):
async def gen():
async for r in db.stream(select(Item)):
yield json.dumps(r._mapping) + "\n"
return StreamingResponse(gen(), media_type="application/x-ndjson")
LLM token streaming
@app.get("/chat")
async def chat(prompt: str):
async def gen():
async with anthropic.messages.stream(
model="claude-sonnet-4-6",
max_tokens=1024,
messages=[{"role": "user", "content": prompt}],
) as stream:
async for text in stream.text_stream:
yield f"data: {json.dumps({'token': text})}\n\n"
yield "event: done\ndata: \n\n"
return StreamingResponse(gen(), media_type="text/event-stream")
Read this next
If you want my real-time fanout reference (FastAPI + Redis pub/sub), it’s at rajpoot.dev .
Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .