Cheatsheet for a complete auth stack.
Model
class User(Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
email: Mapped[str] = mapped_column(unique=True, index=True)
hashed_password: Mapped[str]
is_active: Mapped[bool] = mapped_column(default=True)
role: Mapped[Role] = mapped_column(SAEnum(Role), default=Role.USER)
created_at: Mapped[datetime] = mapped_column(server_default=func.now())
class RefreshToken(Base):
__tablename__ = "refresh_tokens"
id: Mapped[int] = mapped_column(primary_key=True)
user_id: Mapped[int] = mapped_column(ForeignKey("users.id", ondelete="CASCADE"), index=True)
token_hash: Mapped[str] = mapped_column(unique=True, index=True)
expires_at: Mapped[datetime]
revoked: Mapped[bool] = mapped_column(default=False)
Store token_hash (not raw) for refresh tokens.
Pydantic schemas
class UserCreate(BaseModel):
email: EmailStr
password: str = Field(..., min_length=8)
class UserRead(BaseModel):
id: int
email: EmailStr
is_active: bool
role: Role
model_config = {"from_attributes": True}
class TokenPair(BaseModel):
access_token: str
refresh_token: str
token_type: str = "bearer"
class RefreshRequest(BaseModel):
refresh_token: str
Hashing
from passlib.context import CryptContext
pwd = CryptContext(schemes=["argon2"], deprecated="auto")
def hash_password(p: str) -> str:
return pwd.hash(p)
def verify_password(p: str, hashed: str) -> bool:
return pwd.verify(p, hashed)
JWT utilities
import jwt
from datetime import datetime, timedelta
SECRET = settings.secret_key.get_secret_value()
ALG = "HS256"
ACCESS_TTL = timedelta(minutes=15)
REFRESH_TTL = timedelta(days=30)
def make_access_token(user_id: int) -> str:
return jwt.encode(
{"sub": str(user_id), "exp": datetime.utcnow() + ACCESS_TTL, "type": "access"},
SECRET, algorithm=ALG,
)
def decode_access_token(token: str) -> dict:
return jwt.decode(token, SECRET, algorithms=[ALG]) # always pin algorithms
Register
@app.post("/register", response_model=UserRead, status_code=201)
async def register(data: UserCreate, db: AsyncSession = Depends(get_db)):
if await db.scalar(select(User).where(User.email == data.email)):
raise HTTPException(409, "email taken")
user = User(email=data.email, hashed_password=hash_password(data.password))
db.add(user)
await db.commit()
await db.refresh(user)
return user
Login
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
import secrets, hashlib
oauth2 = OAuth2PasswordBearer(tokenUrl="login")
@app.post("/login", response_model=TokenPair)
async def login(form: OAuth2PasswordRequestForm = Depends(), db: AsyncSession = Depends(get_db)):
user = await db.scalar(select(User).where(User.email == form.username))
if not user or not verify_password(form.password, user.hashed_password):
raise HTTPException(401, "Invalid credentials")
if not user.is_active:
raise HTTPException(403, "Inactive user")
access = make_access_token(user.id)
refresh_raw = secrets.token_urlsafe(32)
refresh_hash = hashlib.sha256(refresh_raw.encode()).hexdigest()
db.add(RefreshToken(
user_id=user.id,
token_hash=refresh_hash,
expires_at=datetime.utcnow() + REFRESH_TTL,
))
await db.commit()
return {"access_token": access, "refresh_token": refresh_raw, "token_type": "bearer"}
Refresh
@app.post("/refresh", response_model=TokenPair)
async def refresh(req: RefreshRequest, db: AsyncSession = Depends(get_db)):
h = hashlib.sha256(req.refresh_token.encode()).hexdigest()
rec = await db.scalar(select(RefreshToken).where(
RefreshToken.token_hash == h,
RefreshToken.revoked == False,
RefreshToken.expires_at > datetime.utcnow(),
))
if not rec:
raise HTTPException(401, "Invalid refresh")
# Rotate: revoke old; issue new
rec.revoked = True
new_raw = secrets.token_urlsafe(32)
new_hash = hashlib.sha256(new_raw.encode()).hexdigest()
db.add(RefreshToken(
user_id=rec.user_id,
token_hash=new_hash,
expires_at=datetime.utcnow() + REFRESH_TTL,
))
await db.commit()
return {
"access_token": make_access_token(rec.user_id),
"refresh_token": new_raw,
"token_type": "bearer",
}
current_user dep
async def current_user(token: str = Depends(oauth2), db: AsyncSession = Depends(get_db)) -> User:
try:
payload = decode_access_token(token)
except jwt.PyJWTError:
raise HTTPException(401, "Invalid token")
user = await db.get(User, int(payload["sub"]))
if not user or not user.is_active:
raise HTTPException(401, "Invalid or inactive user")
return user
Protected endpoint
@app.get("/me", response_model=UserRead)
async def me(u: User = Depends(current_user)):
return u
Role-based access
def require_role(*roles: Role):
def dep(u: User = Depends(current_user)):
if u.role not in roles:
raise HTTPException(403)
return u
return dep
@app.delete("/users/{id}")
async def delete_(id: int, u: User = Depends(require_role(Role.ADMIN))):
...
Logout (revoke refresh)
@app.post("/logout")
async def logout(req: RefreshRequest, db: AsyncSession = Depends(get_db)):
h = hashlib.sha256(req.refresh_token.encode()).hexdigest()
await db.execute(
update(RefreshToken).where(RefreshToken.token_hash == h).values(revoked=True)
)
await db.commit()
return {"ok": True}
Password change
@app.post("/change-password")
async def change_pw(data: ChangePassword, u: User = Depends(current_user), db = Depends(get_db)):
if not verify_password(data.old_password, u.hashed_password):
raise HTTPException(401)
u.hashed_password = hash_password(data.new_password)
# Revoke all refresh tokens
await db.execute(update(RefreshToken).where(RefreshToken.user_id == u.id).values(revoked=True))
await db.commit()
return {"ok": True}
Common mistakes
- Storing raw refresh tokens.
- JWT without
algorithms=[ALG]pinned — algorithm confusion attack. - Same-user enumeration: distinct errors for wrong-email vs wrong-password.
- No refresh rotation — stolen refresh works until expiry.
- No rate limiting on /login.
Read this next
If you want my full auth stack starter (passkeys, OAuth, JWT, sessions), it’s at rajpoot.dev .
Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .