Cheatsheet for async SQLAlchemy. Long-form: Textbook Ch 8 .
Setup
from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession
engine = create_async_engine(
"postgresql+asyncpg://user:pass@host/db",
pool_size=20, max_overflow=10,
pool_pre_ping=True, pool_recycle=3600,
)
AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)
Driver compatibility
| Driver | |
|---|---|
| Postgres | asyncpg (recommended), psycopg (v3) |
| MySQL | aiomysql, asyncmy |
| SQLite | aiosqlite |
Basic async patterns
async with AsyncSessionLocal() as session:
user = await session.get(User, 1)
result = await session.execute(select(User))
users = result.scalars().all()
session.add(user)
await session.commit()
Streaming (memory-bounded reads)
result = await session.stream(select(Event))
async for evt in result.scalars():
process(evt)
# Or Core
async with engine.connect() as conn:
result = await conn.stream(select(events_table))
async for row in result:
...
Lazy-loading gotchas
# BAD: implicit sync IO in async context
user = await session.get(User, 1)
print(user.posts) # error or sync block
# FIX 1: eager-load
stmt = select(User).options(selectinload(User.posts))
user = await session.scalar(stmt)
print(user.posts) # works
# FIX 2: awaitable_attrs
posts = await user.awaitable_attrs.posts
# FIX 3: lazy="raise" so unintended lazy-loads error
Run sync code in async context
def sync_helper(session):
# uses sync session API
return session.execute(text("..."))
result = await async_session.run_sync(sync_helper)
Per-request session (FastAPI)
async def get_db(request: Request) -> AsyncSession:
async with request.app.state.sm() as session:
yield session
@app.get("/users/{id}")
async def get_user(id: int, db: AsyncSession = Depends(get_db)):
return await db.get(User, id)
Concurrent ops within one session
One session = one connection = serialized. For parallelism use multiple sessions:
# BAD (serializes on one conn)
async with asyncio.TaskGroup() as tg:
a = tg.create_task(session.scalar(select(A)))
b = tg.create_task(session.scalar(select(B)))
# GOOD
async with AsyncSessionLocal() as s1, AsyncSessionLocal() as s2:
async with asyncio.TaskGroup() as tg:
a = tg.create_task(s1.scalar(...))
b = tg.create_task(s2.scalar(...))
Bulk insert (async)
await session.execute(insert(User), [{"email": ...}, ...])
await session.commit()
Error handling
from sqlalchemy.exc import IntegrityError, OperationalError
try:
session.add(user)
await session.commit()
except IntegrityError as e:
await session.rollback()
if "users_email_key" in str(e.orig):
raise EmailTaken
raise
asyncpg-specific tuning
connect_args = {
"server_settings": {
"application_name": "myapp",
"search_path": "myapp,public",
"jit": "off", # off when using PgBouncer
},
"command_timeout": 60,
"statement_cache_size": 0, # required with PgBouncer transaction-pool
"prepared_statement_cache_size": 0,
}
Sync events on AsyncEngine
@event.listens_for(engine.sync_engine, "connect")
def on_connect(dbapi_conn, conn_record):
cur = dbapi_conn.cursor()
cur.execute("SET TIME ZONE 'UTC'")
cur.close()
Note engine.sync_engine — events still fire on the underlying sync engine.
Common mistakes
- Sync drivers (psycopg2) in async engine — fails.
- Missing
expire_on_commit=False— extra query per commit. - Sharing AsyncSession across tasks — serialization or errors.
- Lazy-load in async without
awaitable_attrs— silent failure. - PgBouncer + prepared statement cache — random errors.
Read this next
If you want my async session + streaming patterns reference, it’s at rajpoot.dev .
Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .