Cheatsheet for end-to-end multi-tenancy.

Strategy: shared DB + tenant_id + RLS

The default for B2B SaaS. Logical isolation; cheap; defended at DB.

Schema

class TenantMixin:
    tenant_id: Mapped[int] = mapped_column(index=True)

class User(TenantMixin, Base):
    __tablename__ = "users"
    id: Mapped[int] = mapped_column(primary_key=True)
    email: Mapped[str]
    
    __table_args__ = (
        Index("ix_users_tenant_email", "tenant_id", "email", unique=True),
    )

Always index tenant_id first.

Migration (enable RLS)

def upgrade():
    op.execute("ALTER TABLE users ENABLE ROW LEVEL SECURITY")
    op.execute("""
        CREATE POLICY tenant_iso ON users
        USING (tenant_id = current_setting('app.tenant_id')::bigint)
        WITH CHECK (tenant_id = current_setting('app.tenant_id')::bigint)
    """)

Tenant context (contextvar)

import contextvars

current_tenant: contextvars.ContextVar[int | None] = contextvars.ContextVar("tenant", default=None)

Middleware to resolve tenant

@app.middleware("http")
async def tenant_middleware(request: Request, call_next):
    # From JWT, subdomain, header, etc.
    tid = await resolve_tenant(request)
    if tid is None and not is_public_path(request.url.path):
        return JSONResponse({"error": "no tenant"}, status_code=400)
    
    token = current_tenant.set(tid)
    try:
        return await call_next(request)
    finally:
        current_tenant.reset(token)

SQLAlchemy event sets SET LOCAL

@event.listens_for(engine.sync_engine, "before_cursor_execute")
def set_tenant(conn, cur, stmt, params, ctx, executemany):
    tid = current_tenant.get()
    if tid is not None:
        cur.execute(f"SET LOCAL app.tenant_id = '{tid}'")

Every query runs with the tenant_id set.

Auto-set tenant_id on insert

@event.listens_for(Session, "before_flush")
def set_tenant_on_insert(session, flush_context, instances):
    tid = current_tenant.get()
    if tid is None: return
    for obj in session.new:
        if isinstance(obj, TenantMixin) and obj.tenant_id is None:
            obj.tenant_id = tid

App role

CREATE ROLE app_user WITH LOGIN PASSWORD '...';
-- NO BYPASSRLS
GRANT SELECT, INSERT, UPDATE, DELETE ON ALL TABLES IN SCHEMA public TO app_user;

App connects as this role. RLS applies.

Admin role (bypass RLS)

CREATE ROLE app_admin WITH LOGIN PASSWORD '...' BYPASSRLS;

Separate engine; used for admin paths only.

async def get_admin_db() -> AsyncSession:
    async with AdminSessionLocal() as session:
        yield session

Tests

@pytest.mark.anyio
async def test_tenant_isolation(db_session):
    # Tenant 1
    await db_session.execute(text("SET LOCAL app.tenant_id = '1'"))
    db_session.add(User(tenant_id=1, email="[email protected]"))
    await db_session.commit()
    
    # Tenant 2 should not see
    await db_session.execute(text("SET LOCAL app.tenant_id = '2'"))
    rows = (await db_session.execute(select(User))).scalars().all()
    assert rows == []

Add to CI.

Pydantic schemas

# tenant_id never in input (set by middleware)
class UserCreate(BaseModel):
    email: EmailStr

# tenant_id may be in output for admin views, not for tenant-scoped views
class UserRead(BaseModel):
    id: int
    email: EmailStr
    model_config = {"from_attributes": True}

class UserReadAdmin(UserRead):
    tenant_id: int

Per-request DB dep

async def get_db(request: Request) -> AsyncSession:
    async with AsyncSessionLocal() as session:
        if tid := getattr(request.state, "tenant_id", None):
            await session.execute(text("SET LOCAL app.tenant_id = :t"), {"t": tid})
        yield session

Tenant resolution sources

async def resolve_tenant(request: Request) -> int | None:
    # 1. From JWT
    auth = request.headers.get("authorization", "")
    if auth.startswith("Bearer "):
        try:
            payload = jwt.decode(auth[7:], SECRET, algorithms=[ALG])
            return payload.get("tenant_id")
        except: pass
    
    # 2. From subdomain
    host = request.headers.get("host", "")
    if "." in host:
        sub = host.split(".")[0]
        if sub not in ("www", "api"):
            return await tenant_id_by_slug(sub)
    
    # 3. From header
    return request.headers.get("x-tenant-id")

Per-tenant rate limit

async def rate_limit(request: Request, redis: Redis = Depends(get_redis)):
    tid = current_tenant.get()
    key = f"rl:{tid}:{request.url.path}"
    if (count := await redis.incr(key)) == 1:
        await redis.expire(key, 60)
    if count > PER_MINUTE_LIMIT:
        raise HTTPException(429, "Rate limited")

Cross-tenant queries (admin)

@app.get("/admin/users", dependencies=[Depends(require_role("admin"))])
async def admin_list(db: AsyncSession = Depends(get_admin_db)):
    return (await db.execute(select(User))).scalars().all()

Uses BYPASSRLS role.

Onboarding new tenant

@app.post("/admin/tenants", dependencies=[Depends(require_role("admin"))])
async def create_tenant(data: TenantCreate, db = Depends(get_admin_db)):
    tenant = Tenant(slug=data.slug, name=data.name)
    db.add(tenant)
    await db.commit()
    return tenant

Audit log per tenant

@event.listens_for(Session, "before_flush")
def audit(session, flush_context, instances):
    tid = current_tenant.get()
    for obj in session.new:
        session.add(AuditLog(
            tenant_id=tid,
            action=f"{type(obj).__name__.lower()}.create",
            ...
        ))

Common mistakes

  • App role with BYPASSRLS — RLS useless.
  • Forgetting SET LOCAL in event — leaks across requests in pool.
  • No isolation test in CI.
  • Hard-coded tenant_id in queries — bypass RLS detection.
  • Cross-tenant FK without tenant_id check.

Read this next

If you want my full multi-tenant stack starter (middleware + RLS + admin), it’s at rajpoot.dev .


Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .