Cheatsheet for LLM integration in FastAPI. Pair with Anthropic API Best Practices .

Anthropic streaming (Claude)

import anthropic, json
client = anthropic.AsyncAnthropic()

@app.post("/chat")
async def chat(req: ChatIn):
    async def gen():
        async with client.messages.stream(
            model="claude-sonnet-4-6",
            max_tokens=2048,
            messages=req.messages,
            system=[{"type": "text", "text": SYS, "cache_control": {"type": "ephemeral"}}],
        ) as stream:
            async for text in stream.text_stream:
                yield f"data: {json.dumps({'token': text})}\n\n"
            final = await stream.get_final_message()
            yield f"event: done\ndata: {json.dumps({'usage': final.usage.model_dump()})}\n\n"
    return StreamingResponse(gen(), media_type="text/event-stream")

OpenAI streaming (chat completion)

from openai import AsyncOpenAI
client = AsyncOpenAI()

@app.post("/chat")
async def chat(req: ChatIn):
    async def gen():
        stream = await client.chat.completions.create(
            model="gpt-4.1-mini",
            messages=req.messages,
            stream=True,
        )
        async for chunk in stream:
            d = chunk.choices[0].delta.content
            if d:
                yield f"data: {json.dumps({'token': d})}\n\n"
        yield "event: done\ndata: \n\n"
    return StreamingResponse(gen(), media_type="text/event-stream")

vLLM (OpenAI-compatible)

client = AsyncOpenAI(base_url="http://vllm:8000/v1", api_key="EMPTY")

# Same OpenAI streaming pattern; just point at vLLM.

Tool calling (Claude)

async def run_tools(messages, tools, max_iters=10):
    for _ in range(max_iters):
        resp = await client.messages.create(
            model="claude-sonnet-4-6",
            max_tokens=4096,
            messages=messages,
            tools=tools,
        )
        messages.append({"role": "assistant", "content": resp.content})
        if resp.stop_reason == "end_turn":
            return resp
        results = []
        for block in resp.content:
            if block.type == "tool_use":
                try:
                    out = await dispatch(block.name, block.input)
                    results.append({"type": "tool_result", "tool_use_id": block.id, "content": json.dumps(out)})
                except Exception as e:
                    results.append({"type": "tool_result", "tool_use_id": block.id, "content": str(e), "is_error": True})
        messages.append({"role": "user", "content": results})
    raise RuntimeError("max iters")

Tool dispatch from Pydantic schemas

from pydantic import BaseModel

class GetWeather(BaseModel):
    city: str
    units: Literal["c", "f"] = "c"

TOOLS = {"get_weather": (GetWeather, get_weather_fn), ...}

async def dispatch(name, args):
    schema, fn = TOOLS[name]
    return await fn(schema.model_validate(args))

tools_for_claude = [
    {"name": "get_weather", "description": "...", "input_schema": GetWeather.model_json_schema()},
]

Cancel on disconnect

async def gen():
    try:
        async with client.messages.stream(...) as stream:
            async for text in stream.text_stream:
                if await request.is_disconnected():
                    return
                yield f"data: {json.dumps({'token': text})}\n\n"
    except asyncio.CancelledError:
        # Client disconnected; clean up
        raise

Saves tokens / GPU when user closes the tab.

Retry with backoff

from tenacity import retry, retry_if_exception_type, wait_exponential, stop_after_attempt

@retry(
    retry=retry_if_exception_type((anthropic.RateLimitError, anthropic.APIConnectionError, anthropic.InternalServerError)),
    wait=wait_exponential(min=2, max=60), stop=stop_after_attempt(5),
)
async def call(messages):
    return await client.messages.create(...)

Token counting

count = await client.messages.count_tokens(model="claude-sonnet-4-6", messages=[...])
print(count.input_tokens)

Cost tracking

async def log_call(resp, feature):
    await metrics.record(
        feature=feature,
        input_tokens=resp.usage.input_tokens,
        cache_read=resp.usage.cache_read_input_tokens or 0,
        cache_create=resp.usage.cache_creation_input_tokens or 0,
        output_tokens=resp.usage.output_tokens,
    )

Routing (cheap → premium)

async def route(query: str):
    classification = await haiku.classify(query)
    if classification.complexity == "trivial":
        return await call(model="claude-haiku-4-5", ...)
    return await call(model="claude-sonnet-4-6", ...)

Structured output via tool_choice

from pydantic import BaseModel

class Answer(BaseModel):
    summary: str
    sentiment: Literal["pos", "neg", "neutral"]
    confidence: float

resp = await client.messages.create(
    model="claude-sonnet-4-6",
    max_tokens=512,
    tools=[{"name": "respond", "input_schema": Answer.model_json_schema()}],
    tool_choice={"type": "tool", "name": "respond"},
    messages=[{"role": "user", "content": prompt}],
)
for block in resp.content:
    if block.type == "tool_use":
        result = Answer.model_validate(block.input)

Read this next

If you want my LLM integration starter (router + caching + streaming + tools), it’s at rajpoot.dev .


Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .