Cheatsheet for the DB-per-tenant pattern. Strongest isolation; highest cost.
Engine cache
import functools
@functools.lru_cache(maxsize=1000)
def tenant_engine(dsn: str):
return create_async_engine(
dsn,
pool_size=2, max_overflow=4,
pool_pre_ping=True, pool_recycle=600,
)
Cap with maxsize. Each tenant’s engine has small pool (since tenants are independent).
Per-request session
async def get_tenant_db(tenant: Tenant = Depends(get_tenant)) -> AsyncSession:
engine = tenant_engine(tenant.dsn)
SessionLocal = async_sessionmaker(engine, expire_on_commit=False)
async with SessionLocal() as session:
yield session
Tenant DSN storage
class Tenant(Base):
__tablename__ = "tenants"
id: Mapped[int] = mapped_column(primary_key=True)
slug: Mapped[str] = mapped_column(unique=True)
dsn: Mapped[str] # postgresql+asyncpg://...
region: Mapped[str]
created_at: Mapped[datetime] = mapped_column(server_default=func.now())
Stored in a global “control” DB.
Resolve tenant per request
async def get_tenant(request: Request, control_db = Depends(get_control_db)) -> Tenant:
tid = request.headers.get("x-tenant-id")
if not tid:
raise HTTPException(400, "missing tenant")
tenant = await control_db.get(Tenant, int(tid))
if not tenant:
raise HTTPException(404, "unknown tenant")
return tenant
Onboarding
async def provision_tenant(slug: str):
# 1. Create DB (or call provisioning API)
# 2. Run migrations
dsn = f"postgresql+asyncpg://app:pass@host/{slug}"
cfg = Config("alembic.ini")
cfg.set_main_option("sqlalchemy.url", dsn)
command.upgrade(cfg, "head")
# 3. Register in control DB
async with ControlSession() as s:
s.add(Tenant(slug=slug, dsn=dsn, region="us-east"))
await s.commit()
Bulk migration runner
import asyncio
async def migrate_all():
tenants = await get_all_tenants()
sem = asyncio.Semaphore(8)
async def migrate_one(t):
async with sem:
cfg = Config("alembic.ini")
cfg.set_main_option("sqlalchemy.url", t.dsn)
await asyncio.to_thread(command.upgrade, cfg, "head")
await asyncio.gather(*(migrate_one(t) for t in tenants))
Concurrency limit; watch DB connection count globally.
Engine cleanup
@asynccontextmanager
async def lifespan(app):
yield
# close all cached engines
for fn in tenant_engine.cache_info():
... # iterate via custom cache instead of lru_cache for proper cleanup
For real cleanup: write a manual cache rather than lru_cache.
Pool sizing math
N_tenants × pool_per_tenant × workers × replicas
100 tenants × 2 pool × 4 workers × 3 replicas = 2400 logical connections.
PgBouncer per tenant or shared. Hit limits fast — DB-per-tenant scales poorly past a few hundred.
Cross-tenant queries (rare)
If a job needs to gather from all tenants:
async def aggregate_all():
results = []
for tenant in await get_all_tenants():
async with tenant_engine(tenant.dsn).connect() as conn:
data = await conn.execute(select(func.count()).select_from(User))
results.append((tenant.slug, data.scalar()))
return results
Iterate; aggregate in Python.
Backups
Each tenant: own backup schedule, retention. Significant ops work for 100+ tenants. Automate.
Region routing
@functools.lru_cache(maxsize=1000)
def regional_engine(region: str, dsn: str):
return create_async_engine(dsn, ...)
Per-region tenants on separate DBs. Compliance: data stays in region.
When DB-per-tenant fits
- Strict isolation required (compliance, contracts).
- < ~100 enterprise tenants.
- Tenants need independent failure domains.
When NOT
- B2C with thousands of users.
- Cost-sensitive: each tenant costs a DB.
- Need cross-tenant queries often.
Common mistakes
- Uncapped engine cache → connection explosion.
- Sequential migrations of 100 tenants → hours.
- Forgetting per-tenant backups.
- Same migration applied to N tenants — partial failures hard to recover.
Read this next
If you want my tenant-router + bulk-migrator code, it’s at rajpoot.dev .
Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .