Cheatsheet for end-to-end multi-tenancy.
Strategy: shared DB + tenant_id + RLS
The default for B2B SaaS. Logical isolation; cheap; defended at DB.
Schema
class TenantMixin:
tenant_id: Mapped[int] = mapped_column(index=True)
class User(TenantMixin, Base):
__tablename__ = "users"
id: Mapped[int] = mapped_column(primary_key=True)
email: Mapped[str]
__table_args__ = (
Index("ix_users_tenant_email", "tenant_id", "email", unique=True),
)
Always index tenant_id first.
Migration (enable RLS)
def upgrade():
op.execute("ALTER TABLE users ENABLE ROW LEVEL SECURITY")
op.execute("""
CREATE POLICY tenant_iso ON users
USING (tenant_id = current_setting('app.tenant_id')::bigint)
WITH CHECK (tenant_id = current_setting('app.tenant_id')::bigint)
""")
Tenant context (contextvar)
import contextvars
current_tenant: contextvars.ContextVar[int | None] = contextvars.ContextVar("tenant", default=None)
Middleware to resolve tenant
@app.middleware("http")
async def tenant_middleware(request: Request, call_next):
# From JWT, subdomain, header, etc.
tid = await resolve_tenant(request)
if tid is None and not is_public_path(request.url.path):
return JSONResponse({"error": "no tenant"}, status_code=400)
token = current_tenant.set(tid)
try:
return await call_next(request)
finally:
current_tenant.reset(token)
SQLAlchemy event sets SET LOCAL
@event.listens_for(engine.sync_engine, "before_cursor_execute")
def set_tenant(conn, cur, stmt, params, ctx, executemany):
tid = current_tenant.get()
if tid is not None:
cur.execute(f"SET LOCAL app.tenant_id = '{tid}'")
Every query runs with the tenant_id set.
Auto-set tenant_id on insert
@event.listens_for(Session, "before_flush")
def set_tenant_on_insert(session, flush_context, instances):
tid = current_tenant.get()
if tid is None: return
for obj in session.new:
if isinstance(obj, TenantMixin) and obj.tenant_id is None:
obj.tenant_id = tid
App role
CREATE ROLE app_user WITH LOGIN PASSWORD '...';
-- NO BYPASSRLS
GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA public TO app_user;
App connects as this role. RLS applies.
Admin role (bypass RLS)
CREATE ROLE app_admin WITH LOGIN PASSWORD '...' BYPASSRLS;
Separate engine; used for admin paths only.
async def get_admin_db() -> AsyncSession:
async with AdminSessionLocal() as session:
yield session
Tests
@pytest.mark.anyio
async def test_tenant_isolation(db_session):
# Tenant 1
await db_session.execute(text("SET LOCAL app.tenant_id = '1'"))
db_session.add(User(tenant_id=1, email="[email protected]"))
await db_session.commit()
# Tenant 2 should not see
await db_session.execute(text("SET LOCAL app.tenant_id = '2'"))
rows = (await db_session.execute(select(User))).scalars().all()
assert rows == []
Add to CI.
Pydantic schemas
# tenant_id never in input (set by middleware)
class UserCreate(BaseModel):
email: EmailStr
# tenant_id may be in output for admin views, not for tenant-scoped views
class UserRead(BaseModel):
id: int
email: EmailStr
model_config = {"from_attributes": True}
class UserReadAdmin(UserRead):
tenant_id: int
Per-request DB dep
async def get_db(request: Request) -> AsyncSession:
async with AsyncSessionLocal() as session:
if tid := getattr(request.state, "tenant_id", None):
await session.execute(text("SET LOCAL app.tenant_id = :t"), {"t": tid})
yield session
Tenant resolution sources
async def resolve_tenant(request: Request) -> int | None:
# 1. From JWT
auth = request.headers.get("authorization", "")
if auth.startswith("Bearer "):
try:
payload = jwt.decode(auth[7:], SECRET, algorithms=[ALG])
return payload.get("tenant_id")
except: pass
# 2. From subdomain
host = request.headers.get("host", "")
if "." in host:
sub = host.split(".")[0]
if sub not in ("www", "api"):
return await tenant_id_by_slug(sub)
# 3. From header
return request.headers.get("x-tenant-id")
Per-tenant rate limit
async def rate_limit(request: Request, redis: Redis = Depends(get_redis)):
tid = current_tenant.get()
key = f"rl:{tid}:{request.url.path}"
if (count := await redis.incr(key)) == 1:
await redis.expire(key, 60)
if count > PER_MINUTE_LIMIT:
raise HTTPException(429, "Rate limited")
Cross-tenant queries (admin)
@app.get("/admin/users", dependencies=[Depends(require_role("admin"))])
async def admin_list(db: AsyncSession = Depends(get_admin_db)):
return (await db.execute(select(User))).scalars().all()
Uses BYPASSRLS role.
Onboarding new tenant
@app.post("/admin/tenants", dependencies=[Depends(require_role("admin"))])
async def create_tenant(data: TenantCreate, db = Depends(get_admin_db)):
tenant = Tenant(slug=data.slug, name=data.name)
db.add(tenant)
await db.commit()
return tenant
Audit log per tenant
@event.listens_for(Session, "before_flush")
def audit(session, flush_context, instances):
tid = current_tenant.get()
for obj in session.new:
session.add(AuditLog(
tenant_id=tid,
action=f"{type(obj).__name__.lower()}.create",
...
))
Common mistakes
- App role with BYPASSRLS — RLS useless.
- Forgetting
SET LOCALin event — leaks across requests in pool. - No isolation test in CI.
- Hard-coded tenant_id in queries — bypass RLS detection.
- Cross-tenant FK without tenant_id check.
Read this next
If you want my full multi-tenant stack starter (middleware + RLS + admin), it’s at rajpoot.dev .
Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .