Chapter 6: how to authenticate and authorize requests in FastAPI. Passwords, JWT, OAuth2 / OIDC, sessions, passkeys, scopes, RBAC. Each pattern with real code.
Password hashing
from passlib.context import CryptContext
pwd_context = CryptContext(schemes=["argon2"], deprecated="auto")
def hash_password(plain: str) -> str:
return pwd_context.hash(plain)
def verify_password(plain: str, hashed: str) -> bool:
return pwd_context.verify(plain, hashed)
Use argon2 by default in 2026. bcrypt acceptable. Never store plaintext.
OAuth2 password flow
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from datetime import datetime, timedelta
import jwt
SECRET_KEY = settings.secret_key
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
@app.post("/token")
async def login(form: OAuth2PasswordRequestForm = Depends(), db: AsyncSession = Depends(get_db)):
user = await db.scalar(select(User).where(User.email == form.username))
if not user or not verify_password(form.password, user.hashed_password):
raise HTTPException(401, "Invalid credentials")
expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
token = jwt.encode({"sub": str(user.id), "exp": expire}, SECRET_KEY, algorithm=ALGORITHM)
return {"access_token": token, "token_type": "bearer"}
async def get_current_user(token: str = Depends(oauth2_scheme), db: AsyncSession = Depends(get_db)) -> User:
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
user_id = payload["sub"]
except jwt.PyJWTError:
raise HTTPException(401, "Invalid token")
user = await db.get(User, int(user_id))
if not user:
raise HTTPException(401, "User not found")
return user
OAuth2PasswordBearer(tokenUrl="token") registers the scheme in OpenAPI; Swagger’s “Authorize” button works.
Refresh tokens
Short-lived access + long-lived refresh:
class RefreshToken(Base):
__tablename__ = "refresh_tokens"
id: Mapped[int] = mapped_column(primary_key=True)
user_id: Mapped[int]
token: Mapped[str] = mapped_column(unique=True)
expires_at: Mapped[datetime]
revoked: Mapped[bool] = mapped_column(default=False)
@app.post("/refresh")
async def refresh(req: RefreshRequest, db: AsyncSession = Depends(get_db)):
rec = await db.scalar(select(RefreshToken).where(
RefreshToken.token == req.refresh_token,
RefreshToken.revoked == False,
RefreshToken.expires_at > datetime.utcnow(),
))
if not rec:
raise HTTPException(401)
new_access = make_access_token(rec.user_id)
return {"access_token": new_access, "token_type": "bearer"}
Access expires in 15 min; refresh in 30 days. See JWT vs Session Cookies .
Sessions (alternative to JWT)
from itsdangerous import URLSafeSerializer
serializer = URLSafeSerializer(SECRET_KEY)
@app.post("/login")
async def login_session(form: OAuth2PasswordRequestForm = Depends(), response: Response = None, db: AsyncSession = Depends(get_db)):
user = await authenticate(form.username, form.password, db)
session_id = secrets.token_urlsafe(32)
await redis.set(f"sess:{session_id}", str(user.id), ex=86400 * 30)
response.set_cookie("session", session_id, httponly=True, secure=True, samesite="lax")
return {"ok": True}
async def get_current_user_from_session(session: str = Cookie(...), db: AsyncSession = Depends(get_db)) -> User:
user_id = await redis.get(f"sess:{session}")
if not user_id:
raise HTTPException(401)
return await db.get(User, int(user_id))
Simpler for first-party web apps. HttpOnly cookie. Server-side state allows revocation.
OIDC (Google / GitHub / Microsoft / Auth0)
from authlib.integrations.starlette_client import OAuth
oauth = OAuth()
oauth.register(
name="google",
server_metadata_url="https://accounts.google.com/.well-known/openid-configuration",
client_id=settings.google_client_id,
client_secret=settings.google_client_secret,
client_kwargs={"scope": "openid email profile"},
)
@app.get("/auth/google")
async def google_login(request: Request):
return await oauth.google.authorize_redirect(request, request.url_for("google_callback"))
@app.get("/auth/google/callback", name="google_callback")
async def google_callback(request: Request, db: AsyncSession = Depends(get_db)):
token = await oauth.google.authorize_access_token(request)
user_info = token.get("userinfo")
user = await find_or_create_user(db, email=user_info["email"], name=user_info["name"])
# issue your own session/JWT
return RedirectResponse("/")
Authlib handles OIDC flows. Issue your own session after; don’t pass Google’s tokens around your app.
API keys
from fastapi.security import APIKeyHeader
api_key_header = APIKeyHeader(name="X-API-Key")
async def get_api_key(key: str = Depends(api_key_header), db: AsyncSession = Depends(get_db)) -> ApiKey:
rec = await db.scalar(select(ApiKey).where(ApiKey.key_hash == hash_key(key), ApiKey.revoked == False))
if not rec:
raise HTTPException(401, "Invalid API key")
return rec
Hash API keys at rest (like passwords). Compare hashes.
Scopes
oauth2_scheme = OAuth2PasswordBearer(
tokenUrl="token",
scopes={"read": "Read items", "write": "Create / update", "admin": "Admin"},
)
@app.post("/token")
async def login(form: OAuth2PasswordRequestForm = Depends()):
user = await authenticate(form.username, form.password)
granted = ... # determine based on user
token = jwt.encode({"sub": str(user.id), "scopes": granted, ...}, ...)
return {"access_token": token, "token_type": "bearer"}
from fastapi import Security
from fastapi.security import SecurityScopes
async def get_user_with_scope(security_scopes: SecurityScopes, token: str = Depends(oauth2_scheme)):
payload = jwt.decode(token, ...)
granted = payload.get("scopes", [])
for needed in security_scopes.scopes:
if needed not in granted:
raise HTTPException(403, f"missing scope: {needed}")
return await db.get(User, payload["sub"])
@app.post("/items", dependencies=[Security(get_user_with_scope, scopes=["write"])])
async def create_item(...): ...
OAuth2 scopes integrated. Swagger UI shows them in “Authorize.”
RBAC
class Role(str, Enum):
USER = "user"
EDITOR = "editor"
ADMIN = "admin"
def require_role(*roles: Role):
def dep(user: User = Depends(get_current_user)):
if user.role not in roles:
raise HTTPException(403)
return user
return dep
@app.delete("/posts/{id}")
async def delete_post(id: int, user: User = Depends(require_role(Role.EDITOR, Role.ADMIN))):
...
Role check as a dep. Composable.
Resource-level authorization
Beyond role: “user can edit this specific post.”
async def get_editable_post(id: int, user: User = Depends(get_current_user), db: AsyncSession = Depends(get_db)) -> Post:
post = await db.get(Post, id)
if not post:
raise HTTPException(404)
if post.author_id != user.id and not user.is_admin:
raise HTTPException(403)
return post
@app.patch("/posts/{id}")
async def update_post(post: Post = Depends(get_editable_post), data: PostUpdate):
...
Authorization at the dep layer; handler stays clean.
Passkeys (WebAuthn)
For modern, phishing-resistant auth:
from webauthn import generate_registration_options, verify_registration_response, generate_authentication_options, verify_authentication_response
@app.post("/passkey/register/begin")
async def begin_register(user: User = Depends(get_current_user)):
options = generate_registration_options(rp_id="example.com", rp_name="Example", user_id=str(user.id), user_name=user.email)
return options
@app.post("/passkey/register/complete")
async def complete_register(req: RegistrationCompleteRequest, db: AsyncSession = Depends(get_db), user: User = Depends(get_current_user)):
verification = verify_registration_response(...)
db.add(Passkey(user_id=user.id, credential_id=verification.credential_id, public_key=verification.public_key, ...))
await db.commit()
return {"ok": True}
py-webauthn library. Pair with username/passkey login flow. See Authentication 2026
.
CSRF
For session-based auth on forms:
from starlette_csrf import CSRFMiddleware
app.add_middleware(CSRFMiddleware, secret=SECRET_KEY)
Or implement double-submit cookie. SameSite=Lax cookies block most CSRF; CSRF token covers the rest.
For pure JSON APIs with bearer tokens: not vulnerable to CSRF (Authorization header can’t be added cross-origin without CORS).
Rate limiting auth endpoints
from slowapi import Limiter
limiter = Limiter(key_func=lambda req: req.client.host)
@app.post("/token")
@limiter.limit("5/minute")
async def login(...): ...
Brute-force protection. Pair with account lockout after N failures.
Password reset
@app.post("/forgot-password")
async def forgot_password(req: ForgotPasswordRequest, db: AsyncSession = Depends(get_db)):
user = await db.scalar(select(User).where(User.email == req.email))
# Always return same response — don't leak existence
if user:
token = secrets.token_urlsafe(32)
await redis.set(f"pwreset:{token}", str(user.id), ex=3600)
await send_email(user.email, f"Reset link: https://app/reset?token={token}")
return {"ok": True}
@app.post("/reset-password")
async def reset_password(req: ResetPasswordRequest, db: AsyncSession = Depends(get_db)):
user_id = await redis.getdel(f"pwreset:{req.token}")
if not user_id:
raise HTTPException(400, "Invalid/expired token")
user = await db.get(User, int(user_id))
user.hashed_password = hash_password(req.new_password)
await db.commit()
return {"ok": True}
Don’t reveal whether email exists. Single-use tokens via getdel.
Email verification
@app.post("/verify-email")
async def verify_email(req: VerifyRequest, db: AsyncSession = Depends(get_db)):
user_id = await redis.getdel(f"verify:{req.token}")
if not user_id:
raise HTTPException(400)
user = await db.get(User, int(user_id))
user.email_verified = True
await db.commit()
return {"ok": True}
Same pattern: single-use token; redis-backed.
Secure headers
from starlette.middleware.cors import CORSMiddleware
app.add_middleware(
CORSMiddleware,
allow_origins=["https://yourdomain.com"],
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.middleware("http")
async def security_headers(request, call_next):
response = await call_next(request)
response.headers["X-Content-Type-Options"] = "nosniff"
response.headers["X-Frame-Options"] = "DENY"
response.headers["Strict-Transport-Security"] = "max-age=63072000; includeSubDomains; preload"
return response
Standard security headers.
Common mistakes
1. Plaintext passwords / weak hashing
Argon2 / bcrypt only. Not SHA / MD5.
2. Long-lived JWTs without revocation
Stolen JWT works for hours/days. Short-lived access + refresh.
3. Mixing scopes and RBAC inconsistently
Pick one model per service. Document it.
4. CSRF for bearer-token APIs
Unnecessary; adds complexity for no benefit. CSRF for session cookies only.
5. Verbose auth errors
“Invalid email” vs “Invalid password” → enumerable user list. “Invalid credentials” generically.
What’s next
Chapter 7: Async, concurrency, and the threadpool.
Read this next
Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .