Cheatsheet for tying logs / traces / metrics together across the stack.
structlog setup
import logging, sys, structlog
from opentelemetry import trace
def add_trace_ids(_, __, ed):
span = trace.get_current_span()
ctx = span.get_span_context()
if ctx.trace_id:
ed["trace_id"] = format(ctx.trace_id, "032x")
ed["span_id"] = format(ctx.span_id, "016x")
return ed
logging.basicConfig(format="%(message)s", stream=sys.stdout, level=logging.INFO)
structlog.configure(
processors=[
structlog.contextvars.merge_contextvars,
structlog.processors.add_log_level,
structlog.processors.TimeStamper(fmt="iso"),
structlog.processors.format_exc_info,
add_trace_ids,
structlog.processors.JSONRenderer() if PROD else structlog.dev.ConsoleRenderer(),
],
wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
)
log = structlog.get_logger()
JSON in prod; pretty in dev.
Request ID + user context middleware
@app.middleware("http")
async def context_mw(request: Request, call_next):
rid = request.headers.get("x-request-id") or str(uuid.uuid4())
structlog.contextvars.bind_contextvars(
request_id=rid,
path=request.url.path,
method=request.method,
)
if u := getattr(request.state, "user", None):
structlog.contextvars.bind_contextvars(user_id=str(u.id))
try:
response = await call_next(request)
response.headers["x-request-id"] = rid
return response
finally:
structlog.contextvars.clear_contextvars()
OTEL setup
from opentelemetry import trace
from opentelemetry.sdk.trace import TracerProvider
from opentelemetry.sdk.trace.export import BatchSpanProcessor
from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter
from opentelemetry.instrumentation.fastapi import FastAPIInstrumentor
from opentelemetry.instrumentation.httpx import HTTPXClientInstrumentor
from opentelemetry.instrumentation.asyncpg import AsyncPGInstrumentor
from opentelemetry.instrumentation.sqlalchemy import SQLAlchemyInstrumentor
provider = TracerProvider()
provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter()))
trace.set_tracer_provider(provider)
FastAPIInstrumentor.instrument_app(app)
HTTPXClientInstrumentor().instrument()
AsyncPGInstrumentor().instrument()
SQLAlchemyInstrumentor().instrument(engine=engine.sync_engine)
Auto-instruments HTTP / DB / outbound calls.
Custom spans
tracer = trace.get_tracer(__name__)
@app.post("/checkout")
async def checkout(...):
with tracer.start_as_current_span("checkout") as span:
span.set_attribute("user_id", str(user.id))
with tracer.start_as_current_span("validate_cart"):
cart = await load_cart(...)
with tracer.start_as_current_span("charge"):
payment = await charge(...)
span.set_attribute("payment_id", payment.id)
return payment
Slow query log
@event.listens_for(engine.sync_engine, "before_cursor_execute")
def before(conn, cur, stmt, params, ctx, executemany):
ctx._t = time.time()
@event.listens_for(engine.sync_engine, "after_cursor_execute")
def after(conn, cur, stmt, params, ctx, executemany):
dur = time.time() - ctx._t
if dur > 0.5:
log.warning("slow_query", sql=stmt[:500], ms=dur*1000)
App-side; complements DB-side auto_explain / pg_stat_statements.
Prometheus metrics
from prometheus_client import Counter, Histogram, generate_latest, CONTENT_TYPE_LATEST
REQ = Counter("http_requests_total", "", ["method", "path", "status"])
LAT = Histogram("http_request_duration_seconds", "", ["method", "path"])
@app.middleware("http")
async def metrics(req, call_next):
t = time.time()
resp = await call_next(req)
dur = time.time() - t
p = req.scope.get("route").path if req.scope.get("route") else req.url.path
REQ.labels(req.method, p, resp.status_code).inc()
LAT.labels(req.method, p).observe(dur)
return resp
@app.get("/metrics", include_in_schema=False)
async def metrics_():
return Response(generate_latest(), media_type=CONTENT_TYPE_LATEST)
Cardinality: use templated route path (/users/{id}), not raw URL.
DB pool metrics
from prometheus_client import Gauge
pool_size_g = Gauge("db_pool_size", "")
pool_in_use_g = Gauge("db_pool_checked_out", "")
async def pool_reporter():
while True:
pool_size_g.set(engine.pool.size())
pool_in_use_g.set(engine.pool.checked_out())
await asyncio.sleep(10)
# Start in lifespan
Health / readiness
@app.get("/healthz", include_in_schema=False)
async def healthz(): return {"status": "ok"}
@app.get("/ready", include_in_schema=False)
async def ready(db = Depends(get_db)):
try: await db.execute(text("SELECT 1"))
except Exception: return JSONResponse({"status": "not ready"}, status_code=503)
return {"status": "ready"}
Sentry
import sentry_sdk
from sentry_sdk.integrations.fastapi import FastApiIntegration
sentry_sdk.init(
dsn=settings.sentry_dsn,
integrations=[FastApiIntegration()],
traces_sample_rate=0.1,
environment=settings.env,
release=settings.version,
)
Audit log table
class AuditLog(Base):
__tablename__ = "audit_log"
id: Mapped[int] = mapped_column(primary_key=True)
actor_id: Mapped[int | None]
action: Mapped[str]
target_type: Mapped[str]
target_id: Mapped[str]
before: Mapped[dict | None] = mapped_column(JSONB)
after: Mapped[dict | None] = mapped_column(JSONB)
occurred_at: Mapped[datetime] = mapped_column(server_default=func.now())
async def audit(db, actor_id, action, target, before=None, after=None):
db.add(AuditLog(actor_id=actor_id, action=action, target_type=type(target).__name__,
target_id=str(target.id), before=before, after=after))
Dashboards
Standard panels per service:
- Request rate (RPS) per endpoint.
- Error rate (4xx, 5xx) per endpoint.
- p50 / p95 / p99 latency.
- DB pool used / available.
- Slow query count.
- DB connections.
- Replication lag.
- ARQ / Celery queue depth.
Common mistakes
- High-cardinality labels (user_id, request_id) on metrics.
- DEBUG logs in prod — volume.
- PII in logs.
- No trace ID in log lines — can’t correlate.
- Sampling errors — lose them all.
Read this next
Cheatsheets complete. Pair with:
If you want my full observability template (OTEL + Prometheus + Loki), it’s at rajpoot.dev .
Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .