Production LLM app cheatsheet.
Architecture
client
↓
edge (auth, rate limit)
↓
app (Next.js / FastAPI)
├── LLM API (OpenAI/Anthropic)
├── Vector DB (Qdrant)
├── Database (Postgres)
├── Cache (Redis)
└── Queue (workers for batch)
Stack picks
- Frontend: Next.js + Vercel AI SDK + Tailwind + shadcn/ui.
- Backend: FastAPI or Next route handlers.
- LLM: Anthropic Claude (default), OpenAI GPT-5, with fallback.
- Vector DB: Qdrant (self-host or managed).
- DB: Postgres + pgvector for simple, separate Qdrant at scale.
- Cache: Redis (Upstash for serverless).
- Embeddings: text-embedding-3-small or BGE local.
- Auth: Auth.js, Clerk, or custom JWT.
- Observability: LangSmith / Helicone / Langfuse.
- Deploy: Vercel (frontend) + Fly.io / Railway / GCP for stateful.
API design
POST /chat stream response
GET /sessions list user sessions
POST /sessions create
DELETE /sessions/:id
POST /messages/:id/feedback thumbs up/down
POST /upload for file inputs
Auth + rate limit
@app.post("/chat")
async def chat(req, user = Depends(get_user)):
if user.tokens_today > user.tier.daily_limit:
return error("Daily limit reached")
response = await llm_call(req.messages)
user.tokens_today += response.usage.total
return response
Use tiered limits: free / pro / enterprise.
Cost guardrails
class CostLimiter:
def __init__(self, daily_cap_usd):
self.cap = daily_cap_usd
async def check(self, user_id):
usage = await get_usage(user_id, today)
if usage > self.cap:
raise BudgetExceeded()
Caching
Levels:
- Semantic cache (LLM responses for similar queries).
- Embedding cache (per text).
- RAG retrieval cache.
- CDN for static UI.
@redis_cache(ttl=3600, key=lambda q: hashlib.md5(q.encode()).hexdigest())
async def cached_embed(text): return await embed(text)
Fallback / retry
async def llm_with_fallback(messages):
try:
return await anthropic.messages.create(model="claude-opus-4-7", messages=messages, max_tokens=1024)
except (RateLimitError, APIError):
return await openai.chat.completions.create(model="gpt-5", messages=messages)
Streaming + cancellation
async def stream():
try:
async with client.messages.stream(...) as s:
async for text in s.text_stream:
yield text
except asyncio.CancelledError:
log.info("client_disconnected")
Tool use safety
SAFE_TOOLS = {"search_web", "read_file"}
DANGEROUS = {"send_email", "execute_code"}
async def run_tool(name, args, user):
if name in DANGEROUS and not user.confirmed_action:
raise NeedsConfirmation(name, args)
return await tools[name](**args)
Memory architecture
- Short-term: conversation context (last N messages).
- Mid-term: session summary (after threshold).
- Long-term: vector DB of facts.
- Working: scratchpad files.
Multi-tenant
Separate by user_id everywhere:
- DB rows filtered by user.
- Vector DB filtered by user_id in payload.
- Rate limits per user.
- Cost tracking per user.
Database schema (Postgres)
CREATE TABLE users (
id UUID PRIMARY KEY,
email TEXT UNIQUE,
tier TEXT DEFAULT 'free',
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE TABLE sessions (
id UUID PRIMARY KEY,
user_id UUID REFERENCES users(id),
title TEXT,
created_at TIMESTAMPTZ DEFAULT NOW(),
updated_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE TABLE messages (
id UUID PRIMARY KEY,
session_id UUID REFERENCES sessions(id),
role TEXT NOT NULL,
content TEXT NOT NULL,
tokens_in INT,
tokens_out INT,
model TEXT,
cost_usd NUMERIC,
rating SMALLINT,
metadata JSONB,
created_at TIMESTAMPTZ DEFAULT NOW()
);
CREATE INDEX ON messages (session_id, created_at);
Dockerfile
FROM python:3.13-slim AS builder
WORKDIR /app
RUN pip install uv
COPY pyproject.toml uv.lock ./
RUN uv sync --frozen --no-dev
FROM python:3.13-slim
WORKDIR /app
RUN useradd -u 1000 -m app
COPY --from=builder --chown=app:app /app/.venv ./.venv
COPY --chown=app:app . .
USER app
ENV PATH=/app/.venv/bin:$PATH
EXPOSE 8000
CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "4"]
Env config
from pydantic_settings import BaseSettings
class Settings(BaseSettings):
anthropic_api_key: str
openai_api_key: str
database_url: str
redis_url: str
qdrant_url: str
qdrant_api_key: str
daily_budget_usd: float = 100.0
free_tier_messages: int = 50
sentry_dsn: str | None = None
settings = Settings()
Health checks
@app.get("/health")
async def health():
return {"ok": True}
@app.get("/health/ready")
async def ready():
await db.execute("SELECT 1")
await redis.ping()
return {"ok": True}
Monitoring
- Per-endpoint p50/p95/p99 latency.
- LLM call latency separately.
- Token usage / cost.
- Error rate by error type.
- Cache hit rate.
- Active users.
- User feedback ratio.
Eval in prod
Sample 5% of conversations → LLM-judge → track quality. Alert on regression.
CI/CD
- run: pytest
- run: python eval.py --baseline baseline.json
- run: docker build .
- run: docker push
- run: ssh deploy@server "docker compose pull && docker compose up -d"
Disaster scenarios
- Anthropic down → fallback to OpenAI.
- Cost spike → auto-disable / alert.
- Prompt injection → output classifier.
- DB lost → restore from backup.
- Vector DB lost → re-embed from source.
Pre-launch checklist
- Rate limits per user.
- Cost ceiling per user.
- Auth + tiered limits.
- Multi-provider fallback.
- Streaming + cancellation.
- Feedback collection.
- Eval baseline.
- Observability.
- PII redaction in logs.
- Tool use sandboxed.
- Backups configured.
- Health checks wired to LB.
- CI runs evals.
Read this next
That’s 20 AI/LLM cheatsheets. Next category: Redis.
If you want my LLM SaaS starter (auth + RAG + tools + observability), it’s at rajpoot.dev .
Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .