The FastAPI stack has stabilized in 2026, and it’s the most pleasant way to build a Python API today. Pydantic v2 does validation 50× faster than v1. SQLAlchemy 2.0 has first-class async and a much cleaner ORM. FastAPI glues them together with type-driven everything.

This post is the layout I reach for on day one of a new service. It scales past one file without rewriting. Every decision below is justified — copy the parts that fit, ignore the rest.

The shape of the app

app/
├── __init__.py
├── main.py              # FastAPI() + lifespan
├── settings.py          # Pydantic settings
├── deps.py              # shared dependencies
├── errors.py            # exception handlers
├── logging.py           # structured logging
├── db/
│   ├── __init__.py
│   ├── base.py          # DeclarativeBase, naming convention
│   ├── session.py       # async engine + sessionmaker
│   └── models/
│       └── user.py
├── routers/
│   ├── users.py
│   └── items.py
├── schemas/             # Pydantic request/response models
│   ├── user.py
│   └── item.py
├── services/            # business logic, no FastAPI imports
│   └── users.py
└── alembic/             # migrations

Three layers: routers (HTTP), services (business), db/models (storage). Schemas are the contract. Dependencies wire them together.

Settings — pydantic-settings

# app/settings.py
from functools import lru_cache
from pydantic import PostgresDsn, RedisDsn
from pydantic_settings import BaseSettings, SettingsConfigDict


class Settings(BaseSettings):
    model_config = SettingsConfigDict(env_file=".env", env_prefix="APP_")

    env: str = "dev"                       # dev | staging | prod
    database_url: PostgresDsn
    redis_url: RedisDsn | None = None
    jwt_secret: str
    log_level: str = "INFO"


@lru_cache(maxsize=1)
def get_settings() -> Settings:
    return Settings()

Why lru_cache: read once, share. Why env_prefix="APP_": keeps your envs from colliding with PATH or PYTHONHOME. Why typed URLs: Pydantic validates the format on startup, not at the first DB call.

Database — async SQLAlchemy 2.0

# app/db/base.py
from sqlalchemy import MetaData
from sqlalchemy.orm import DeclarativeBase

# Naming convention so Alembic generates predictable migration names.
NAMING_CONVENTION = {
    "ix":  "ix_%(column_0_label)s",
    "uq":  "uq_%(table_name)s_%(column_0_name)s",
    "ck":  "ck_%(table_name)s_%(constraint_name)s",
    "fk":  "fk_%(table_name)s_%(column_0_name)s_%(referred_table_name)s",
    "pk":  "pk_%(table_name)s",
}


class Base(DeclarativeBase):
    metadata = MetaData(naming_convention=NAMING_CONVENTION)
# app/db/session.py
from sqlalchemy.ext.asyncio import async_sessionmaker, create_async_engine

from app.settings import get_settings

settings = get_settings()

engine = create_async_engine(
    str(settings.database_url),
    pool_size=10,
    max_overflow=10,
    pool_pre_ping=True,           # reconnects after stale connections
    echo=False,                   # set True in dev when debugging SQL
)

SessionLocal = async_sessionmaker(engine, expire_on_commit=False)

expire_on_commit=False is the right default for async — the alternative makes every attribute access a re-fetch, which is a deadly trap in async code.

A model

# app/db/models/user.py
from datetime import datetime
from sqlalchemy import String, DateTime, func
from sqlalchemy.orm import Mapped, mapped_column

from app.db.base import Base


class User(Base):
    __tablename__ = "users"

    id:         Mapped[int]      = mapped_column(primary_key=True)
    email:      Mapped[str]      = mapped_column(String(255), unique=True, index=True)
    full_name:  Mapped[str]      = mapped_column(String(120))
    is_active:  Mapped[bool]     = mapped_column(default=True)
    created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), server_default=func.now())

Mapped[T] + mapped_column is the SQLAlchemy 2.0 way. Type checker happy. IDE autocomplete works. No more Column(...).

Schemas — Pydantic v2

# app/schemas/user.py
from pydantic import BaseModel, EmailStr, ConfigDict


class UserBase(BaseModel):
    email: EmailStr
    full_name: str


class UserCreate(UserBase):
    password: str


class UserOut(UserBase):
    id: int
    is_active: bool

    model_config = ConfigDict(from_attributes=True)    # was orm_mode in v1

Pydantic v2 highlights you’ll use:

  • model_config = ConfigDict(from_attributes=True) — read SQLAlchemy objects directly.
  • EmailStr, HttpUrl, IPvAnyAddress — free validation.
  • Annotated[int, Field(gt=0, le=100)] — typed constraints.
  • model_validator(mode="after") — cross-field validation.

Dependencies

# app/deps.py
from typing import AsyncGenerator
from fastapi import Depends
from sqlalchemy.ext.asyncio import AsyncSession

from app.db.session import SessionLocal


async def get_db() -> AsyncGenerator[AsyncSession, None]:
    async with SessionLocal() as session:
        yield session


# Type alias keeps signatures short.
DBSession = Annotated[AsyncSession, Depends(get_db)]

Why async with: the session is closed even on exceptions. Why a type alias: every router function would otherwise repeat db: AsyncSession = Depends(get_db) like it’s 2018. With DBSession, you write db: DBSession.

A service

Business logic lives here, not in routers. Services know nothing about HTTP.

# app/services/users.py
from sqlalchemy import select
from sqlalchemy.ext.asyncio import AsyncSession

from app.db.models.user import User
from app.errors import NotFoundError, ConflictError
from app.schemas.user import UserCreate


async def get_user(db: AsyncSession, user_id: int) -> User:
    user = await db.get(User, user_id)
    if user is None:
        raise NotFoundError("user", user_id)
    return user


async def create_user(db: AsyncSession, payload: UserCreate) -> User:
    existing = await db.scalar(select(User).where(User.email == payload.email))
    if existing:
        raise ConflictError(f"email {payload.email} already used")

    user = User(email=payload.email, full_name=payload.full_name)
    # hash password elsewhere
    db.add(user)
    await db.flush()
    await db.refresh(user)
    return user

Routers commit; services flush. That keeps transactions controlled at the request boundary.

A router

# app/routers/users.py
from fastapi import APIRouter, status

from app.deps import DBSession
from app.schemas.user import UserCreate, UserOut
from app.services import users as svc

router = APIRouter(prefix="/users", tags=["users"])


@router.post("", response_model=UserOut, status_code=status.HTTP_201_CREATED)
async def create_user(payload: UserCreate, db: DBSession) -> UserOut:
    user = await svc.create_user(db, payload)
    await db.commit()
    return UserOut.model_validate(user)


@router.get("/{user_id}", response_model=UserOut)
async def read_user(user_id: int, db: DBSession) -> UserOut:
    user = await svc.get_user(db, user_id)
    return UserOut.model_validate(user)

Notice:

  • response_model=UserOut — FastAPI uses the schema for OpenAPI and as a serialization filter. It strips fields the model has but UserOut doesn’t.
  • The router commits the transaction. The service shouldn’t commit; it doesn’t know if it’s the only thing happening in this request.
  • model_validate (v2) instead of from_orm (v1).

Errors that don’t make you cry

# app/errors.py
class AppError(Exception):
    status_code = 500
    code = "internal_error"

    def __init__(self, message: str, **extra):
        super().__init__(message)
        self.message = message
        self.extra = extra


class NotFoundError(AppError):
    status_code = 404
    code = "not_found"

    def __init__(self, what: str, ident):
        super().__init__(f"{what} {ident} not found", what=what, ident=ident)


class ConflictError(AppError):
    status_code = 409
    code = "conflict"
# app/main.py
from fastapi import FastAPI, Request
from fastapi.responses import JSONResponse

from app.errors import AppError


app = FastAPI()


@app.exception_handler(AppError)
async def handle_app_error(request: Request, exc: AppError):
    return JSONResponse(
        status_code=exc.status_code,
        content={"error": exc.code, "message": exc.message, **exc.extra},
    )

Now every error has a stable shape your clients can rely on. {"error": "not_found", "message": "...", ...} — easier to handle than fishing through HTTP status alone.

Logging that’s actually useful

# app/logging.py
import logging, sys, json, time
from fastapi import Request


class JsonFormatter(logging.Formatter):
    def format(self, record: logging.LogRecord) -> str:
        payload = {
            "ts": int(time.time() * 1000),
            "level": record.levelname,
            "logger": record.name,
            "msg": record.getMessage(),
        }
        if hasattr(record, "request_id"):
            payload["request_id"] = record.request_id
        if record.exc_info:
            payload["exc"] = self.formatException(record.exc_info)
        return json.dumps(payload)


def configure_logging(level: str = "INFO"):
    handler = logging.StreamHandler(sys.stdout)
    handler.setFormatter(JsonFormatter())
    root = logging.getLogger()
    root.handlers = [handler]
    root.setLevel(level)

Plus a request middleware that adds a request_id:

import uuid
from contextvars import ContextVar

request_id_var: ContextVar[str] = ContextVar("request_id")


@app.middleware("http")
async def request_id_middleware(request: Request, call_next):
    rid = request.headers.get("x-request-id") or uuid.uuid4().hex
    token = request_id_var.set(rid)
    try:
        response = await call_next(request)
        response.headers["x-request-id"] = rid
        return response
    finally:
        request_id_var.reset(token)

Structured logs + a request id you can grep for is 90% of observability for free.

Lifespan — the right way

# app/main.py
from contextlib import asynccontextmanager
import httpx

from app.db.session import engine
from app.logging import configure_logging
from app.settings import get_settings


@asynccontextmanager
async def lifespan(app: FastAPI):
    s = get_settings()
    configure_logging(s.log_level)

    app.state.http = httpx.AsyncClient(timeout=10.0)
    yield
    await app.state.http.aclose()
    await engine.dispose()


app = FastAPI(lifespan=lifespan)

lifespan is the modern replacement for on_event("startup"). Open shared resources once, close them cleanly on shutdown.

Alembic with async

uv add alembic
alembic init -t async migrations

Then in migrations/env.py:

from app.db.base import Base
from app.db.models import user        # noqa: import all models so metadata is populated
target_metadata = Base.metadata

Generate:

alembic revision --autogenerate -m "create users"
alembic upgrade head

The naming_convention we set on Base.metadata makes autogenerated migrations stable across environments — they’re not littered with hash-named constraints that diff every time.

Testing — async, fast, isolated

# tests/conftest.py
import pytest_asyncio
from httpx import AsyncClient, ASGITransport

from app.main import app
from app.db.session import engine
from app.db.base import Base


@pytest_asyncio.fixture
async def db_schema():
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.create_all)
    yield
    async with engine.begin() as conn:
        await conn.run_sync(Base.metadata.drop_all)


@pytest_asyncio.fixture
async def client(db_schema):
    async with AsyncClient(transport=ASGITransport(app=app), base_url="http://test") as c:
        yield c
# tests/test_users.py
async def test_create_user(client):
    r = await client.post("/users", json={"email": "[email protected]", "full_name": "A B", "password": "secret"})
    assert r.status_code == 201
    assert r.json()["email"] == "[email protected]"

ASGITransport runs your app in-process with no network. Tests in milliseconds. Pair with a Postgres test container if you want real DB behavior in CI.

Performance basics

  • Async DB drivers only. asyncpg for raw, asyncmy for MySQL. Mixing sync and async = event-loop blocking = throughput dies.
  • Pool size: typical FastAPI service does well at pool_size=10, max_overflow=10. Tune based on pg_stat_activity.
  • uvloop + httptools when running with Uvicorn. Free 20%+ throughput.
  • Don’t await inside loops unless you have to. asyncio.gather for fan-out.
  • N+1 queries are still a thing. Use selectinload / joinedload from SQLAlchemy when you need related data.
from sqlalchemy.orm import selectinload

users = await db.scalars(
    select(User).options(selectinload(User.posts))
)

Deploying

FROM python:3.13-slim
WORKDIR /app

RUN pip install --no-cache-dir uv
COPY pyproject.toml uv.lock ./
RUN uv sync --frozen --no-dev

COPY . .
CMD ["uv", "run", "uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000", "--workers", "2"]

In Kubernetes, set CPU/memory requests honestly. Use a readiness probe that hits a /healthz that also pings the DB — a service that can’t reach Postgres should not be in rotation.

Read this next

If you want a starter template that ships all of this with linting, CI, and docker-compose, it’s at rajpoot.dev .


Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .