Chapter 11: how to know what’s happening in a production FastAPI app. Structured logging, distributed tracing, metrics. Each set up with discipline.

Structured logging with structlog

import structlog
import logging

logging.basicConfig(format="%(message)s", stream=sys.stdout, level=logging.INFO)

structlog.configure(
    processors=[
        structlog.contextvars.merge_contextvars,
        structlog.processors.add_log_level,
        structlog.processors.TimeStamper(fmt="iso"),
        structlog.processors.StackInfoRenderer(),
        structlog.processors.format_exc_info,
        structlog.processors.JSONRenderer() if PROD else structlog.dev.ConsoleRenderer(),
    ],
    wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
    cache_logger_on_first_use=True,
)

log = structlog.get_logger()

JSON in prod, pretty in dev. See Python Logging .

Request ID middleware

@app.middleware("http")
async def request_id_middleware(request: Request, call_next):
    rid = request.headers.get("x-request-id") or str(uuid.uuid4())
    structlog.contextvars.bind_contextvars(request_id=rid, path=request.url.path, method=request.method)
    try:
        response = await call_next(request)
        response.headers["x-request-id"] = rid
        return response
    finally:
        structlog.contextvars.clear_contextvars()

Now every log line in this request includes request_id automatically. No manual passing.

Per-request user context

@app.middleware("http")
async def user_context_middleware(request: Request, call_next):
    if user := getattr(request.state, "user", None):
        structlog.contextvars.bind_contextvars(user_id=str(user.id), tenant_id=user.tenant_id)
    return await call_next(request)

Order matters: ensure auth middleware runs before this.

OpenTelemetry tracing

from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
from opentelemetry.instrumentation.asyncpg import AsyncPGInstrumentor

provider = TracerProvider()
provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter()))
trace.set_tracer_provider(provider)

FastAPIInstrumentor.instrument_app(app)
HTTPXClientInstrumentor().instrument()
AsyncPGInstrumentor().instrument()

Auto-instrumentation gives you spans for HTTP / DB / external calls.

See Distributed Tracing .

Custom spans

tracer = trace.get_tracer(__name__)

async def checkout(user_id: int):
    with tracer.start_as_current_span("checkout") as span:
        span.set_attribute("user_id", str(user_id))
        
        with tracer.start_as_current_span("validate_cart"):
            cart = await load_cart(user_id)
        
        with tracer.start_as_current_span("charge"):
            payment = await charge_user(user_id, cart.total)
            span.set_attribute("payment.id", payment.id)
        
        return payment

For business operations that aren’t auto-instrumented.

Trace + log correlation

def add_trace_ids(_, __, event_dict):
    span = trace.get_current_span()
    ctx = span.get_span_context()
    if ctx.trace_id:
        event_dict["trace_id"] = format(ctx.trace_id, "032x")
        event_dict["span_id"] = format(ctx.span_id, "016x")
    return event_dict

# Add to structlog processors

Now logs include trace IDs. Click from Loki to Tempo and back.

Prometheus metrics

from prometheus_client import Counter, Histogram, generate_latest, CONTENT_TYPE_LATEST

REQUEST_COUNT = Counter("http_requests_total", "Total requests", ["method", "path", "status"])
REQUEST_LATENCY = Histogram("http_request_duration_seconds", "Request latency", ["method", "path"])

@app.middleware("http")
async def metrics_middleware(request: Request, call_next):
    start = time.time()
    response = await call_next(request)
    duration = time.time() - start
    
    path = request.scope.get("route").path if request.scope.get("route") else request.url.path
    REQUEST_COUNT.labels(request.method, path, response.status_code).inc()
    REQUEST_LATENCY.labels(request.method, path).observe(duration)
    
    return response

@app.get("/metrics", include_in_schema=False)
async def metrics():
    return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST)

Use route path (templated) not URL path (concrete) — avoids cardinality explosion.

For a more complete solution: prometheus-fastapi-instrumentator:

from prometheus_fastapi_instrumentator import Instrumentator

Instrumentator().instrument(app).expose(app)

Custom metrics

LOGIN_ATTEMPTS = Counter("auth_login_attempts_total", "Login attempts", ["result"])

@app.post("/token")
async def login(...):
    try:
        user = await authenticate(...)
        LOGIN_ATTEMPTS.labels("success").inc()
    except InvalidCredentials:
        LOGIN_ATTEMPTS.labels("failure").inc()
        raise

For business KPIs.

Health and readiness

@app.get("/healthz", include_in_schema=False)
async def health():
    return {"status": "ok"}

@app.get("/ready", include_in_schema=False)
async def ready(db: AsyncSession = Depends(get_db)):
    try:
        await db.execute(text("SELECT 1"))
    except Exception:
        return JSONResponse({"status": "not ready"}, status_code=503)
    return {"status": "ready"}

Liveness: just up. Readiness: deps reachable.

Sentry / error tracking

import sentry_sdk
from sentry_sdk.integrations.fastapi import FastApiIntegration

sentry_sdk.init(
    dsn=settings.sentry_dsn,
    integrations=[FastApiIntegration()],
    traces_sample_rate=0.1,
    environment=settings.env,
    release=settings.version,
)

Captures unhandled exceptions; provides traces. Free / paid tiers.

Audit logging

For compliance / debugging:

async def audit_log(actor_id, action, target, before=None, after=None):
    await db.execute(
        insert(AuditLog).values(
            actor_id=actor_id, action=action, target=target,
            before=before, after=after, occurred_at=datetime.utcnow(),
        )
    )

@app.put("/users/{id}")
async def update_user(id: int, data: UserUpdate, current: User = Depends(get_current_user), db = Depends(get_db)):
    user = await db.get(User, id)
    before = user.dict()
    update_fields(user, data)
    after = user.dict()
    await audit_log(current.id, "user.update", {"type": "user", "id": id}, before, after)
    await db.commit()

Append-only; never modified. See Postgres JSONB for storage.

What to log

  • Every request: method, path, status, duration, request_id, user_id.
  • Every error: exception, stack trace.
  • Every external call: target, duration, success/failure.
  • Auth events: login success/failure, password changes.
  • Sensitive operations: data export, account deletion.

What NOT:

  • PII in logs (emails, names) — redact.
  • Passwords / tokens — never.
  • Full request/response bodies by default — sample.

Sampling

For high-volume: sample.

def should_sample(request):
    if request.url.path == "/healthz": return False
    if random.random() < 0.1: return True
    return False

Don’t log every health check. Sample non-essential.

Cardinality control

For metrics labels:

  • Method, path (templated), status — fine.
  • User_id, request_id — too high; explodes series count.

Move per-user / per-request data to traces / logs.

See Observability Cost .

Dashboards

Standard Grafana dashboards per service:

  • Request rate by endpoint.
  • Error rate (4xx, 5xx).
  • Latency p50 / p95 / p99.
  • DB pool usage.
  • External API call success/failure.

Define SLOs on these. See SLOs and Error Budgets .

Alerts

  • High error rate (>1% sustained).
  • High latency (p99 > SLO).
  • Pod restarts.
  • DB connections near max.
  • Background queue depth growing.

Page only on user-impacting issues. Tickets for slow-burn.

Common mistakes

1. Logs without structure

logger.info(f"user {id} did {x}") — millions of unique strings; can’t query.

2. PII in logs

Email addresses; names. Redact at the processor.

3. No request_id

Can’t correlate across services or even within one service.

4. High-cardinality metrics

user_id as label → series explosion → expensive observability bill.

5. Sampling errors

Sample 10% of requests; an error fires once; not in your sample. Always sample 100% of errors.

What’s next

Chapter 12: Deployment, scaling, production.

Read this next


Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .