Chapter 8: async SQLAlchemy in detail. The drivers, the AsyncSession, what’s the same as sync, what’s different, and the gotchas.
Async drivers
| Driver | Notes | |
|---|---|---|
| Postgres | asyncpg | Default; fastest |
| Postgres | psycopg (v3) | Native async support |
| MySQL | aiomysql | |
| MySQL | asyncmy | Faster alternative |
| SQLite | aiosqlite |
engine = create_async_engine("postgresql+asyncpg://user:pass@host/db")
engine = create_async_engine("postgresql+psycopg://user:pass@host/db")
engine = create_async_engine("mysql+aiomysql://user:pass@host/db")
engine = create_async_engine("sqlite+aiosqlite:///app.db")
For Postgres in 2026: asyncpg is the default; psycopg v3 catches up but asyncpg is faster.
Setup
from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker
engine = create_async_engine(URL, pool_size=20, max_overflow=10)
AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)
async with AsyncSessionLocal() as session:
user = await session.get(User, 1)
Almost identical to sync. Just add await and use AsyncSession.
expire_on_commit=False
Strongly recommended for FastAPI / web apps:
async_sessionmaker(engine, expire_on_commit=False)
Why: the default expire_on_commit=True invalidates all ORM instance attributes after commit. Next attribute access reloads from DB. For web requests where you commit and return: a wasted query.
Async query patterns
# Single by PK
user = await session.get(User, 1)
# By predicate
user = await session.scalar(select(User).where(User.email == "x"))
# Multiple
result = await session.execute(select(User))
users = result.scalars().all()
# Bulk update
await session.execute(update(User).where(User.active == False).values(banned=True))
await session.commit()
Same patterns as sync; await at the boundaries.
Streaming
For result sets larger than memory:
result = await session.stream(select(Event))
async for row in result.scalars():
process(row)
stream() returns an async iterator; rows fetched in chunks. Memory bounded.
For Core (no ORM):
async with engine.connect() as conn:
result = await conn.stream(select(events_table))
async for row in result:
...
Lazy loading caveats
Lazy loading triggers a query on attribute access — but lazy loading is sync. In async: user.posts (with default lazy) triggers an implicit await that doesn’t work.
Solutions:
- Eager-load explicitly:
stmt = select(User).options(selectinload(User.posts))
user = await session.scalar(stmt)
print(user.posts) # works
- Use
awaitable_attrs:
posts = await user.awaitable_attrs.posts
Lazy-loads explicitly. Async-friendly.
lazy="raise"to fail loud:
class User(Base):
posts: Mapped[list[Post]] = relationship(lazy="raise")
Accessing without explicit load raises. Forces you to selectinload.
In async-heavy code: enforce explicit loading. See Chapter 6.
AsyncSession with FastAPI
@asynccontextmanager
async def lifespan(app):
app.state.engine = create_async_engine(DATABASE_URL)
app.state.sessionmaker = async_sessionmaker(app.state.engine, expire_on_commit=False)
yield
await app.state.engine.dispose()
async def get_db(request: Request) -> AsyncSession:
async with request.app.state.sessionmaker() as session:
yield session
@app.get("/users/{id}")
async def get_user(id: int, db: AsyncSession = Depends(get_db)):
return await db.get(User, id)
Engine: lifespan. Session: per-request via Depends. See FastAPI DI chapter .
Concurrent queries in one session
A session is bound to a single connection. Concurrent ops on the same session don’t parallelize:
# BAD — one connection serializes
async with asyncio.TaskGroup() as tg:
a = tg.create_task(session.scalar(select(A)))
b = tg.create_task(session.scalar(select(B)))
For parallel queries: use multiple sessions:
async with AsyncSessionLocal() as s1, AsyncSessionLocal() as s2:
async with asyncio.TaskGroup() as tg:
a = tg.create_task(s1.scalar(...))
b = tg.create_task(s2.scalar(...))
Or use Core (no session, direct connections from pool).
For most cases: serial in one session is fine; the bottleneck is the DB, not the protocol.
Long-running transactions
async with AsyncSessionLocal() as session:
user = await session.get(User, 1)
await asyncio.sleep(60) # holds connection from pool!
user.email = "new"
await session.commit()
Don’t hold sessions across long awaits. Acquire late, release early. Connection pool can starve.
Error handling
try:
async with AsyncSessionLocal() as session:
async with session.begin():
...
except IntegrityError as e:
# constraint violation
...
except OperationalError as e:
# connection / network
...
The transaction is rolled back automatically on exception inside session.begin().
For unique-constraint detection (cross-DB):
from sqlalchemy.exc import IntegrityError
try:
session.add(user)
await session.commit()
except IntegrityError as e:
if "users_email_key" in str(e.orig): # constraint name
raise EmailTaken
raise
For Postgres-specific error code checking, see the Postgres textbook .
Custom event hooks (async)
Some events run sync; some async. For sync hooks on AsyncEngine:
@event.listens_for(engine.sync_engine, "connect")
def on_connect(dbapi_conn, conn_record):
...
Note sync_engine — events still fire on the underlying sync engine.
ORM async vs Core async
# ORM
result = await session.execute(select(User).where(User.id == 1))
user = result.scalar_one()
# Core (no session)
async with engine.connect() as conn:
result = await conn.execute(select(users_table).where(users_table.c.id == 1))
row = result.first()
Core gives a Row (or mapping); no session, no identity map, no relationships. Useful for read-heavy paths where ORM overhead isn’t justified.
Run-sync escape hatch
Sometimes you have a sync function that needs to run in async context:
def sync_legacy(session):
# uses sync session API
...
await session.run_sync(sync_legacy)
run_sync runs in a thread, with a sync Session adapter. Useful for migrating legacy code or libraries that use sync.
Connection events
@event.listens_for(engine.sync_engine, "connect")
def on_connect(dbapi_conn, conn_record):
cursor = dbapi_conn.cursor()
cursor.execute("SET application_name = 'myapp'")
cursor.close()
Set per-connection state at the driver level.
For asyncpg-specific: connect_args={"server_settings": {"application_name": "myapp"}}.
Pool considerations
Async pool semantics are like sync. Don’t over-size.
For asyncpg: connection pool is in SQLAlchemy, not in asyncpg directly. Use SQLAlchemy’s pool config.
Common mistakes
1. Lazy-loading in async
user.posts without preload → error or implicit sync query (depending on config). Use selectinload or awaitable_attrs.
2. Sharing AsyncSession across tasks
Concurrent ops serialize on one connection; sometimes break. Separate sessions.
3. Forgetting expire_on_commit=False
Extra query per commit; mysterious slowness.
4. Using sync drivers in async engine
postgresql+psycopg2:// won’t work as async. Use +asyncpg or +psycopg (v3 async).
5. Not closing engine
Lifespan shutdown should await engine.dispose().
What’s next
Chapter 9: Performance.
Read this next
Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .