Chapter 11: how to know what’s happening in a production FastAPI app. Structured logging, distributed tracing, metrics. Each set up with discipline.
Structured logging with structlog
import structlog
import logging
logging.basicConfig(format="%(message)s", stream=sys.stdout, level=logging.INFO)
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.StackInfoRenderer(),
structlog.processors.format_exc_info,
structlog.processors.JSONRenderer() if PROD else structlog.dev.ConsoleRenderer(),
],
wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
cache_logger_on_first_use=True,
)
log = structlog.get_logger()
JSON in prod, pretty in dev. See Python Logging .
Request ID middleware
@app.middleware("http")
async def request_id_middleware(request: Request, call_next):
rid = request.headers.get("x-request-id") or str(uuid.uuid4())
structlog.contextvars.bind_contextvars(request_id=rid, path=request.url.path, method=request.method)
try:
response = await call_next(request)
response.headers["x-request-id"] = rid
return response
finally:
structlog.contextvars.clear_contextvars()
Now every log line in this request includes request_id automatically. No manual passing.
Per-request user context
@app.middleware("http")
async def user_context_middleware(request: Request, call_next):
if user := getattr(request.state, "user", None):
structlog.contextvars.bind_contextvars(user_id=str(user.id), tenant_id=user.tenant_id)
return await call_next(request)
Order matters: ensure auth middleware runs before this.
OpenTelemetry tracing
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
from opentelemetry.instrumentation.asyncpg import AsyncPGInstrumentor
provider = TracerProvider()
provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter()))
trace.set_tracer_provider(provider)
FastAPIInstrumentor.instrument_app(app)
HTTPXClientInstrumentor().instrument()
AsyncPGInstrumentor().instrument()
Auto-instrumentation gives you spans for HTTP / DB / external calls.
See Distributed Tracing .
Custom spans
tracer = trace.get_tracer(__name__)
async def checkout(user_id: int):
with tracer.start_as_current_span("checkout") as span:
span.set_attribute("user_id", str(user_id))
with tracer.start_as_current_span("validate_cart"):
cart = await load_cart(user_id)
with tracer.start_as_current_span("charge"):
payment = await charge_user(user_id, cart.total)
span.set_attribute("payment.id", payment.id)
return payment
For business operations that aren’t auto-instrumented.
Trace + log correlation
def add_trace_ids(_, __, event_dict):
span = trace.get_current_span()
ctx = span.get_span_context()
if ctx.trace_id:
event_dict["trace_id"] = format(ctx.trace_id, "032x")
event_dict["span_id"] = format(ctx.span_id, "016x")
return event_dict
# Add to structlog processors
Now logs include trace IDs. Click from Loki to Tempo and back.
Prometheus metrics
from prometheus_client import Counter, Histogram, generate_latest, CONTENT_TYPE_LATEST
REQUEST_COUNT = Counter("http_requests_total", "Total requests", ["method", "path", "status"])
REQUEST_LATENCY = Histogram("http_request_duration_seconds", "Request latency", ["method", "path"])
@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
start = time.time()
response = await call_next(request)
duration = time.time() - start
path = request.scope.get("route").path if request.scope.get("route") else request.url.path
REQUEST_COUNT.labels(request.method, path, response.status_code).inc()
REQUEST_LATENCY.labels(request.method, path).observe(duration)
return response
@app.get("/metrics", include_in_schema=False)
async def metrics():
return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST)
Use route path (templated) not URL path (concrete) — avoids cardinality explosion.
For a more complete solution: prometheus-fastapi-instrumentator:
from prometheus_fastapi_instrumentator import Instrumentator
Instrumentator().instrument(app).expose(app)
Custom metrics
LOGIN_ATTEMPTS = Counter("auth_login_attempts_total", "Login attempts", ["result"])
@app.post("/token")
async def login(...):
try:
user = await authenticate(...)
LOGIN_ATTEMPTS.labels("success").inc()
except InvalidCredentials:
LOGIN_ATTEMPTS.labels("failure").inc()
raise
For business KPIs.
Health and readiness
@app.get("/healthz", include_in_schema=False)
async def health():
return {"status": "ok"}
@app.get("/ready", include_in_schema=False)
async def ready(db: AsyncSession = Depends(get_db)):
try:
await db.execute(text("SELECT 1"))
except Exception:
return JSONResponse({"status": "not ready"}, status_code=503)
return {"status": "ready"}
Liveness: just up. Readiness: deps reachable.
Sentry / error tracking
import sentry_sdk
from sentry_sdk.integrations.fastapi import FastApiIntegration
sentry_sdk.init(
dsn=settings.sentry_dsn,
integrations=[FastApiIntegration()],
traces_sample_rate=0.1,
environment=settings.env,
release=settings.version,
)
Captures unhandled exceptions; provides traces. Free / paid tiers.
Audit logging
For compliance / debugging:
async def audit_log(actor_id, action, target, before=None, after=None):
await db.execute(
insert(AuditLog).values(
actor_id=actor_id, action=action, target=target,
before=before, after=after, occurred_at=datetime.utcnow(),
)
)
@app.put("/users/{id}")
async def update_user(id: int, data: UserUpdate, current: User = Depends(get_current_user), db = Depends(get_db)):
user = await db.get(User, id)
before = user.dict()
update_fields(user, data)
after = user.dict()
await audit_log(current.id, "user.update", {"type": "user", "id": id}, before, after)
await db.commit()
Append-only; never modified. See Postgres JSONB for storage.
What to log
- Every request: method, path, status, duration, request_id, user_id.
- Every error: exception, stack trace.
- Every external call: target, duration, success/failure.
- Auth events: login success/failure, password changes.
- Sensitive operations: data export, account deletion.
What NOT:
- PII in logs (emails, names) — redact.
- Passwords / tokens — never.
- Full request/response bodies by default — sample.
Sampling
For high-volume: sample.
def should_sample(request):
if request.url.path == "/healthz": return False
if random.random() < 0.1: return True
return False
Don’t log every health check. Sample non-essential.
Cardinality control
For metrics labels:
- Method, path (templated), status — fine.
- User_id, request_id — too high; explodes series count.
Move per-user / per-request data to traces / logs.
See Observability Cost .
Dashboards
Standard Grafana dashboards per service:
- Request rate by endpoint.
- Error rate (4xx, 5xx).
- Latency p50 / p95 / p99.
- DB pool usage.
- External API call success/failure.
Define SLOs on these. See SLOs and Error Budgets .
Alerts
- High error rate (>1% sustained).
- High latency (p99 > SLO).
- Pod restarts.
- DB connections near max.
- Background queue depth growing.
Page only on user-impacting issues. Tickets for slow-burn.
Common mistakes
1. Logs without structure
logger.info(f"user {id} did {x}") — millions of unique strings; can’t query.
2. PII in logs
Email addresses; names. Redact at the processor.
3. No request_id
Can’t correlate across services or even within one service.
4. High-cardinality metrics
user_id as label → series explosion → expensive observability bill.
5. Sampling errors
Sample 10% of requests; an error fires once; not in your sample. Always sample 100% of errors.
What’s next
Chapter 12: Deployment, scaling, production.
Read this next
Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .