Cheatsheet for async SQLAlchemy. Long-form: Textbook Ch 8 .

Setup

from sqlalchemy.ext.asyncio import create_async_engine, async_sessionmaker, AsyncSession

engine = create_async_engine(
    "postgresql+asyncpg://user:pass@host/db",
    pool_size=20, max_overflow=10,
    pool_pre_ping=True, pool_recycle=3600,
)

AsyncSessionLocal = async_sessionmaker(engine, expire_on_commit=False)

Driver compatibility

Driver
Postgresasyncpg (recommended), psycopg (v3)
MySQLaiomysql, asyncmy
SQLiteaiosqlite

Basic async patterns

async with AsyncSessionLocal() as session:
    user = await session.get(User, 1)
    
    result = await session.execute(select(User))
    users = result.scalars().all()
    
    session.add(user)
    await session.commit()

Streaming (memory-bounded reads)

result = await session.stream(select(Event))
async for evt in result.scalars():
    process(evt)

# Or Core
async with engine.connect() as conn:
    result = await conn.stream(select(events_table))
    async for row in result:
        ...

Lazy-loading gotchas

# BAD: implicit sync IO in async context
user = await session.get(User, 1)
print(user.posts)        # error or sync block

# FIX 1: eager-load
stmt = select(User).options(selectinload(User.posts))
user = await session.scalar(stmt)
print(user.posts)        # works

# FIX 2: awaitable_attrs
posts = await user.awaitable_attrs.posts

# FIX 3: lazy="raise" so unintended lazy-loads error

Run sync code in async context

def sync_helper(session):
    # uses sync session API
    return session.execute(text("..."))

result = await async_session.run_sync(sync_helper)

Per-request session (FastAPI)

async def get_db(request: Request) -> AsyncSession:
    async with request.app.state.sm() as session:
        yield session

@app.get("/users/{id}")
async def get_user(id: int, db: AsyncSession = Depends(get_db)):
    return await db.get(User, id)

Concurrent ops within one session

One session = one connection = serialized. For parallelism use multiple sessions:

# BAD (serializes on one conn)
async with asyncio.TaskGroup() as tg:
    a = tg.create_task(session.scalar(select(A)))
    b = tg.create_task(session.scalar(select(B)))

# GOOD
async with AsyncSessionLocal() as s1, AsyncSessionLocal() as s2:
    async with asyncio.TaskGroup() as tg:
        a = tg.create_task(s1.scalar(...))
        b = tg.create_task(s2.scalar(...))

Bulk insert (async)

await session.execute(insert(User), [{"email": ...}, ...])
await session.commit()

Error handling

from sqlalchemy.exc import IntegrityError, OperationalError

try:
    session.add(user)
    await session.commit()
except IntegrityError as e:
    await session.rollback()
    if "users_email_key" in str(e.orig):
        raise EmailTaken
    raise

asyncpg-specific tuning

connect_args = {
    "server_settings": {
        "application_name": "myapp",
        "search_path": "myapp,public",
        "jit": "off",                       # off when using PgBouncer
    },
    "command_timeout": 60,
    "statement_cache_size": 0,              # required with PgBouncer transaction-pool
    "prepared_statement_cache_size": 0,
}

Sync events on AsyncEngine

@event.listens_for(engine.sync_engine, "connect")
def on_connect(dbapi_conn, conn_record):
    cur = dbapi_conn.cursor()
    cur.execute("SET TIME ZONE 'UTC'")
    cur.close()

Note engine.sync_engine — events still fire on the underlying sync engine.

Common mistakes

  • Sync drivers (psycopg2) in async engine — fails.
  • Missing expire_on_commit=False — extra query per commit.
  • Sharing AsyncSession across tasks — serialization or errors.
  • Lazy-load in async without awaitable_attrs — silent failure.
  • PgBouncer + prepared statement cache — random errors.

Read this next

If you want my async session + streaming patterns reference, it’s at rajpoot.dev .


Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .