Chapter 6: how to authenticate and authorize requests in FastAPI. Passwords, JWT, OAuth2 / OIDC, sessions, passkeys, scopes, RBAC. Each pattern with real code.

Password hashing

from passlib.context import CryptContext

pwd_context = CryptContext(schemes=["argon2"], deprecated="auto")

def hash_password(plain: str) -> str:
    return pwd_context.hash(plain)

def verify_password(plain: str, hashed: str) -> bool:
    return pwd_context.verify(plain, hashed)

Use argon2 by default in 2026. bcrypt acceptable. Never store plaintext.

OAuth2 password flow

from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from datetime import datetime, timedelta
import jwt

SECRET_KEY = settings.secret_key
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30

oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")

@app.post("/token")
async def login(form: OAuth2PasswordRequestForm = Depends(), db: AsyncSession = Depends(get_db)):
    user = await db.scalar(select(User).where(User.email == form.username))
    if not user or not verify_password(form.password, user.hashed_password):
        raise HTTPException(401, "Invalid credentials")
    
    expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
    token = jwt.encode({"sub": str(user.id), "exp": expire}, SECRET_KEY, algorithm=ALGORITHM)
    return {"access_token": token, "token_type": "bearer"}

async def get_current_user(token: str = Depends(oauth2_scheme), db: AsyncSession = Depends(get_db)) -> User:
    try:
        payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
        user_id = payload["sub"]
    except jwt.PyJWTError:
        raise HTTPException(401, "Invalid token")
    user = await db.get(User, int(user_id))
    if not user:
        raise HTTPException(401, "User not found")
    return user

OAuth2PasswordBearer(tokenUrl="token") registers the scheme in OpenAPI; Swagger’s “Authorize” button works.

Refresh tokens

Short-lived access + long-lived refresh:

class RefreshToken(Base):
    __tablename__ = "refresh_tokens"
    id: Mapped[int] = mapped_column(primary_key=True)
    user_id: Mapped[int]
    token: Mapped[str] = mapped_column(unique=True)
    expires_at: Mapped[datetime]
    revoked: Mapped[bool] = mapped_column(default=False)

@app.post("/refresh")
async def refresh(req: RefreshRequest, db: AsyncSession = Depends(get_db)):
    rec = await db.scalar(select(RefreshToken).where(
        RefreshToken.token == req.refresh_token,
        RefreshToken.revoked == False,
        RefreshToken.expires_at > datetime.utcnow(),
    ))
    if not rec:
        raise HTTPException(401)
    new_access = make_access_token(rec.user_id)
    return {"access_token": new_access, "token_type": "bearer"}

Access expires in 15 min; refresh in 30 days. See JWT vs Session Cookies .

Sessions (alternative to JWT)

from itsdangerous import URLSafeSerializer

serializer = URLSafeSerializer(SECRET_KEY)

@app.post("/login")
async def login_session(form: OAuth2PasswordRequestForm = Depends(), response: Response = None, db: AsyncSession = Depends(get_db)):
    user = await authenticate(form.username, form.password, db)
    session_id = secrets.token_urlsafe(32)
    await redis.set(f"sess:{session_id}", str(user.id), ex=86400 * 30)
    response.set_cookie("session", session_id, httponly=True, secure=True, samesite="lax")
    return {"ok": True}

async def get_current_user_from_session(session: str = Cookie(...), db: AsyncSession = Depends(get_db)) -> User:
    user_id = await redis.get(f"sess:{session}")
    if not user_id:
        raise HTTPException(401)
    return await db.get(User, int(user_id))

Simpler for first-party web apps. HttpOnly cookie. Server-side state allows revocation.

OIDC (Google / GitHub / Microsoft / Auth0)

from authlib.integrations.starlette_client import OAuth

oauth = OAuth()
oauth.register(
    name="google",
    server_metadata_url="https://accounts.google.com/.well-known/openid-configuration",
    client_id=settings.google_client_id,
    client_secret=settings.google_client_secret,
    client_kwargs={"scope": "openid email profile"},
)

@app.get("/auth/google")
async def google_login(request: Request):
    return await oauth.google.authorize_redirect(request, request.url_for("google_callback"))

@app.get("/auth/google/callback", name="google_callback")
async def google_callback(request: Request, db: AsyncSession = Depends(get_db)):
    token = await oauth.google.authorize_access_token(request)
    user_info = token.get("userinfo")
    user = await find_or_create_user(db, email=user_info["email"], name=user_info["name"])
    # issue your own session/JWT
    return RedirectResponse("/")

Authlib handles OIDC flows. Issue your own session after; don’t pass Google’s tokens around your app.

API keys

from fastapi.security import APIKeyHeader

api_key_header = APIKeyHeader(name="X-API-Key")

async def get_api_key(key: str = Depends(api_key_header), db: AsyncSession = Depends(get_db)) -> ApiKey:
    rec = await db.scalar(select(ApiKey).where(ApiKey.key_hash == hash_key(key), ApiKey.revoked == False))
    if not rec:
        raise HTTPException(401, "Invalid API key")
    return rec

Hash API keys at rest (like passwords). Compare hashes.

Scopes

oauth2_scheme = OAuth2PasswordBearer(
    tokenUrl="token",
    scopes={"read": "Read items", "write": "Create / update", "admin": "Admin"},
)

@app.post("/token")
async def login(form: OAuth2PasswordRequestForm = Depends()):
    user = await authenticate(form.username, form.password)
    granted = ...  # determine based on user
    token = jwt.encode({"sub": str(user.id), "scopes": granted, ...}, ...)
    return {"access_token": token, "token_type": "bearer"}

from fastapi import Security
from fastapi.security import SecurityScopes

async def get_user_with_scope(security_scopes: SecurityScopes, token: str = Depends(oauth2_scheme)):
    payload = jwt.decode(token, ...)
    granted = payload.get("scopes", [])
    for needed in security_scopes.scopes:
        if needed not in granted:
            raise HTTPException(403, f"missing scope: {needed}")
    return await db.get(User, payload["sub"])

@app.post("/items", dependencies=[Security(get_user_with_scope, scopes=["write"])])
async def create_item(...): ...

OAuth2 scopes integrated. Swagger UI shows them in “Authorize.”

RBAC

class Role(str, Enum):
    USER = "user"
    EDITOR = "editor"
    ADMIN = "admin"

def require_role(*roles: Role):
    def dep(user: User = Depends(get_current_user)):
        if user.role not in roles:
            raise HTTPException(403)
        return user
    return dep

@app.delete("/posts/{id}")
async def delete_post(id: int, user: User = Depends(require_role(Role.EDITOR, Role.ADMIN))):
    ...

Role check as a dep. Composable.

Resource-level authorization

Beyond role: “user can edit this specific post.”

async def get_editable_post(id: int, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)) -> Post:
    post = await db.get(Post, id)
    if not post:
        raise HTTPException(404)
    if post.author_id != user.id and not user.is_admin:
        raise HTTPException(403)
    return post

@app.patch("/posts/{id}")
async def update_post(post: Post = Depends(get_editable_post), data: PostUpdate):
    ...

Authorization at the dep layer; handler stays clean.

Passkeys (WebAuthn)

For modern, phishing-resistant auth:

from webauthn import generate_registration_options, verify_registration_response, generate_authentication_options, verify_authentication_response

@app.post("/passkey/register/begin")
async def begin_register(user: User = Depends(get_current_user)):
    options = generate_registration_options(rp_id="example.com", rp_name="Example", user_id=str(user.id), user_name=user.email)
    return options

@app.post("/passkey/register/complete")
async def complete_register(req: RegistrationCompleteRequest, db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)):
    verification = verify_registration_response(...)
    db.add(Passkey(user_id=user.id, credential_id=verification.credential_id, public_key=verification.public_key, ...))
    await db.commit()
    return {"ok": True}

py-webauthn library. Pair with username/passkey login flow. See Authentication 2026 .

CSRF

For session-based auth on forms:

from starlette_csrf import CSRFMiddleware

app.add_middleware(CSRFMiddleware, secret=SECRET_KEY)

Or implement double-submit cookie. SameSite=Lax cookies block most CSRF; CSRF token covers the rest.

For pure JSON APIs with bearer tokens: not vulnerable to CSRF (Authorization header can’t be added cross-origin without CORS).

Rate limiting auth endpoints

from slowapi import Limiter
limiter = Limiter(key_func=lambda req: req.client.host)

@app.post("/token")
@limiter.limit("5/minute")
async def login(...): ...

Brute-force protection. Pair with account lockout after N failures.

Password reset

@app.post("/forgot-password")
async def forgot_password(req: ForgotPasswordRequest, db: AsyncSession = Depends(get_db)):
    user = await db.scalar(select(User).where(User.email == req.email))
    # Always return same response — don't leak existence
    if user:
        token = secrets.token_urlsafe(32)
        await redis.set(f"pwreset:{token}", str(user.id), ex=3600)
        await send_email(user.email, f"Reset link: https://app/reset?token={token}")
    return {"ok": True}

@app.post("/reset-password")
async def reset_password(req: ResetPasswordRequest, db: AsyncSession = Depends(get_db)):
    user_id = await redis.getdel(f"pwreset:{req.token}")
    if not user_id:
        raise HTTPException(400, "Invalid/expired token")
    user = await db.get(User, int(user_id))
    user.hashed_password = hash_password(req.new_password)
    await db.commit()
    return {"ok": True}

Don’t reveal whether email exists. Single-use tokens via getdel.

Email verification

@app.post("/verify-email")
async def verify_email(req: VerifyRequest, db: AsyncSession = Depends(get_db)):
    user_id = await redis.getdel(f"verify:{req.token}")
    if not user_id:
        raise HTTPException(400)
    user = await db.get(User, int(user_id))
    user.email_verified = True
    await db.commit()
    return {"ok": True}

Same pattern: single-use token; redis-backed.

Secure headers

from starlette.middleware.cors import CORSMiddleware

app.add_middleware(
    CORSMiddleware,
    allow_origins=["https://yourdomain.com"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)

@app.middleware("http")
async def security_headers(request, call_next):
    response = await call_next(request)
    response.headers["X-Content-Type-Options"] = "nosniff"
    response.headers["X-Frame-Options"] = "DENY"
    response.headers["Strict-Transport-Security"] = "max-age=63072000; includeSubDomains; preload"
    return response

Standard security headers.

Common mistakes

1. Plaintext passwords / weak hashing

Argon2 / bcrypt only. Not SHA / MD5.

2. Long-lived JWTs without revocation

Stolen JWT works for hours/days. Short-lived access + refresh.

3. Mixing scopes and RBAC inconsistently

Pick one model per service. Document it.

4. CSRF for bearer-token APIs

Unnecessary; adds complexity for no benefit. CSRF for session cookies only.

5. Verbose auth errors

“Invalid email” vs “Invalid password” → enumerable user list. “Invalid credentials” generically.

What’s next

Chapter 7: Async, concurrency, and the threadpool.

Read this next


Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .