Chapter 8: async SQLAlchemy in detail. The drivers, the AsyncSession, what’s the same as sync, what’s different, and the gotchas.

Async drivers

DriverNotes
PostgresasyncpgDefault; fastest
Postgrespsycopg (v3)Native async support
MySQLaiomysql
MySQLasyncmyFaster alternative
SQLiteaiosqlite
engine = create_async_engine("postgresql+asyncpg://user:pass@host/db")
engine = create_async_engine("postgresql+psycopg://user:pass@host/db")
engine = create_async_engine("mysql+aiomysql://user:pass@host/db")
engine = create_async_engine("sqlite+aiosqlite:///app.db")

For Postgres in 2026: asyncpg is the default; psycopg v3 catches up but asyncpg is faster.

Setup

from sqlalchemy.ext.asyncio import create_async_engine, AsyncSession, async_sessionmaker

engine = create_async_engine(URL, pool_size=20, max_overflow=10)

AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)

async with AsyncSessionLocal() as session:
    user = await session.get(User, 1)

Almost identical to sync. Just add await and use AsyncSession.

expire_on_commit=False

Strongly recommended for FastAPI / web apps:

async_sessionmaker(engine, expire_on_commit=False)

Why: the default expire_on_commit=True invalidates all ORM instance attributes after commit. Next attribute access reloads from DB. For web requests where you commit and return: a wasted query.

Async query patterns

# Single by PK
user = await session.get(User, 1)

# By predicate
user = await session.scalar(select(User).where(User.email == "x"))

# Multiple
result = await session.execute(select(User))
users = result.scalars().all()

# Bulk update
await session.execute(update(User).where(User.active == False).values(banned=True))
await session.commit()

Same patterns as sync; await at the boundaries.

Streaming

For result sets larger than memory:

result = await session.stream(select(Event))
async for row in result.scalars():
    process(row)

stream() returns an async iterator; rows fetched in chunks. Memory bounded.

For Core (no ORM):

async with engine.connect() as conn:
    result = await conn.stream(select(events_table))
    async for row in result:
        ...

Lazy loading caveats

Lazy loading triggers a query on attribute access — but lazy loading is sync. In async: user.posts (with default lazy) triggers an implicit await that doesn’t work.

Solutions:

  1. Eager-load explicitly:
stmt = select(User).options(selectinload(User.posts))
user = await session.scalar(stmt)
print(user.posts)  # works
  1. Use awaitable_attrs:
posts = await user.awaitable_attrs.posts

Lazy-loads explicitly. Async-friendly.

  1. lazy="raise" to fail loud:
class User(Base):
    posts: Mapped[list[Post]] = relationship(lazy="raise")

Accessing without explicit load raises. Forces you to selectinload.

In async-heavy code: enforce explicit loading. See Chapter 6.

AsyncSession with FastAPI

@asynccontextmanager
async def lifespan(app):
    app.state.engine = create_async_engine(DATABASE_URL)
    app.state.sessionmaker = async_sessionmaker(app.state.engine, expire_on_commit=False)
    yield
    await app.state.engine.dispose()

async def get_db(request: Request) -> AsyncSession:
    async with request.app.state.sessionmaker() as session:
        yield session

@app.get("/users/{id}")
async def get_user(id: int, db: AsyncSession = Depends(get_db)):
    return await db.get(User, id)

Engine: lifespan. Session: per-request via Depends. See FastAPI DI chapter .

Concurrent queries in one session

A session is bound to a single connection. Concurrent ops on the same session don’t parallelize:

# BAD — one connection serializes
async with asyncio.TaskGroup() as tg:
    a = tg.create_task(session.scalar(select(A)))
    b = tg.create_task(session.scalar(select(B)))

For parallel queries: use multiple sessions:

async with AsyncSessionLocal() as s1, AsyncSessionLocal() as s2:
    async with asyncio.TaskGroup() as tg:
        a = tg.create_task(s1.scalar(...))
        b = tg.create_task(s2.scalar(...))

Or use Core (no session, direct connections from pool).

For most cases: serial in one session is fine; the bottleneck is the DB, not the protocol.

Long-running transactions

async with AsyncSessionLocal() as session:
    user = await session.get(User, 1)
    await asyncio.sleep(60)  # holds connection from pool!
    user.email = "new"
    await session.commit()

Don’t hold sessions across long awaits. Acquire late, release early. Connection pool can starve.

Error handling

try:
    async with AsyncSessionLocal() as session:
        async with session.begin():
            ...
except IntegrityError as e:
    # constraint violation
    ...
except OperationalError as e:
    # connection / network
    ...

The transaction is rolled back automatically on exception inside session.begin().

For unique-constraint detection (cross-DB):

from sqlalchemy.exc import IntegrityError

try:
    session.add(user)
    await session.commit()
except IntegrityError as e:
    if "users_email_key" in str(e.orig):  # constraint name
        raise EmailTaken
    raise

For Postgres-specific error code checking, see the Postgres textbook .

Custom event hooks (async)

Some events run sync; some async. For sync hooks on AsyncEngine:

@event.listens_for(engine.sync_engine, "connect")
def on_connect(dbapi_conn, conn_record):
    ...

Note sync_engine — events still fire on the underlying sync engine.

ORM async vs Core async

# ORM
result = await session.execute(select(User).where(User.id == 1))
user = result.scalar_one()

# Core (no session)
async with engine.connect() as conn:
    result = await conn.execute(select(users_table).where(users_table.c.id == 1))
    row = result.first()

Core gives a Row (or mapping); no session, no identity map, no relationships. Useful for read-heavy paths where ORM overhead isn’t justified.

Run-sync escape hatch

Sometimes you have a sync function that needs to run in async context:

def sync_legacy(session):
    # uses sync session API
    ...

await session.run_sync(sync_legacy)

run_sync runs in a thread, with a sync Session adapter. Useful for migrating legacy code or libraries that use sync.

Connection events

@event.listens_for(engine.sync_engine, "connect")
def on_connect(dbapi_conn, conn_record):
    cursor = dbapi_conn.cursor()
    cursor.execute("SET application_name = 'myapp'")
    cursor.close()

Set per-connection state at the driver level.

For asyncpg-specific: connect_args={"server_settings": {"application_name": "myapp"}}.

Pool considerations

Async pool semantics are like sync. Don’t over-size.

For asyncpg: connection pool is in SQLAlchemy, not in asyncpg directly. Use SQLAlchemy’s pool config.

Common mistakes

1. Lazy-loading in async

user.posts without preload → error or implicit sync query (depending on config). Use selectinload or awaitable_attrs.

2. Sharing AsyncSession across tasks

Concurrent ops serialize on one connection; sometimes break. Separate sessions.

3. Forgetting expire_on_commit=False

Extra query per commit; mysterious slowness.

4. Using sync drivers in async engine

postgresql+psycopg2:// won’t work as async. Use +asyncpg or +psycopg (v3 async).

5. Not closing engine

Lifespan shutdown should await engine.dispose().

What’s next

Chapter 9: Performance.

Read this next


Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .