Chapter 9: Postgres-specific bulk operations. Upsert via ON CONFLICT, bulk insert, and COPY for serious data loads.

ON CONFLICT (upsert)

from sqlalchemy.dialects.postgresql import insert as pg_insert

stmt = pg_insert(User).values(email="[email protected]", name="Alice")
stmt = stmt.on_conflict_do_update(
    index_elements=["email"],
    set_={"name": stmt.excluded.name, "updated_at": func.now()},
)
await session.execute(stmt)

stmt.excluded references the row that would have been inserted.

ON CONFLICT DO NOTHING

stmt = pg_insert(User).values(email="[email protected]", name="Alice")
stmt = stmt.on_conflict_do_nothing(index_elements=["email"])

Skip duplicates silently.

RETURNING

stmt = pg_insert(User).values(email="[email protected]").returning(User.id, User.email)
result = await session.execute(stmt)
rows = result.all()

Get back inserted rows. Combined with ON CONFLICT:

stmt = (
    pg_insert(User)
    .values(email="[email protected]", name="Alice")
    .on_conflict_do_update(index_elements=["email"], set_={"name": "Alice"})
    .returning(User.id)
)

Returns the row whether inserted or updated.

Bulk insert

data = [
    {"email": "[email protected]", "name": "Alice"},
    {"email": "[email protected]", "name": "Bob"},
    # ... thousands
]

stmt = insert(User)
await session.execute(stmt, data)
await session.commit()

Single INSERT with many rows. SQLAlchemy batches efficiently.

Bulk upsert

stmt = pg_insert(User)
stmt = stmt.on_conflict_do_update(index_elements=["email"], set_={"name": stmt.excluded.name})
await session.execute(stmt, data)

Same pattern; works at bulk scale.

bulk_insert_mappings (older but still works)

session.bulk_insert_mappings(User, data)
await session.commit()

Legacy API; the modern pattern is execute(insert(...), data).

COPY (asyncpg)

For really large loads: COPY is dramatically faster than INSERT.

async with engine.begin() as conn:
    raw = await conn.get_raw_connection()
    asyncpg_conn = raw.driver_connection
    
    await asyncpg_conn.copy_records_to_table(
        "users",
        records=[(email, name) for email, name in data],
        columns=["email", "name"],
    )

Drops to asyncpg directly. 5–10× faster than INSERT for millions of rows.

For psycopg v3:

async with engine.connect() as conn:
    raw = await conn.get_raw_connection()
    psycopg_conn = raw.driver_connection
    
    async with psycopg_conn.cursor() as cur:
        async with cur.copy("COPY users (email, name) FROM STDIN") as copy:
            for email, name in data:
                await copy.write_row((email, name))

COPY from CSV

async with asyncpg_conn.transaction():
    with open("users.csv") as f:
        await asyncpg_conn.copy_to_table("users", source=f, columns=["email", "name"], format="csv", header=True)

For loading CSVs.

Performance comparison

For 100k rows on a moderate setup:

  • ORM session.add loop: ~30s.
  • session.execute(insert, list): ~2s.
  • COPY: ~0.5s.

Use COPY for ETL / bulk loads. Use bulk insert for normal app paths.

Streaming generators

For huge loads that don’t fit in memory:

async def read_records():
    async with aiofiles.open("data.csv") as f:
        async for line in f:
            yield parse(line)

async with asyncpg_conn.transaction():
    await asyncpg_conn.copy_records_to_table("users", records=read_records(), columns=["email", "name"])

asyncpg streams records; bounded memory.

Conflict on multiple constraints

stmt = pg_insert(Item).values(...).on_conflict_do_update(
    constraint="ix_items_unique_compound",
    set_={"updated_at": func.now()},
)

Reference by constraint name when multiple unique constraints exist.

Conditional update

stmt = pg_insert(User).values(email="[email protected]", name="Alice")
stmt = stmt.on_conflict_do_update(
    index_elements=["email"],
    set_={"name": stmt.excluded.name},
    where=User.updated_at < stmt.excluded.updated_at,
)

Only update if the new row is newer. “Last writer wins” with timestamp.

Chunked bulk

For very large lists, chunk to avoid huge statements:

CHUNK = 1000
for i in range(0, len(data), CHUNK):
    await session.execute(insert(User), data[i:i+CHUNK])
await session.commit()

Alternatively, COPY handles arbitrary size.

Common mistakes

1. ORM loop for bulk

10k iterations × ORM overhead = slow. Use bulk insert.

2. ON CONFLICT without index

stmt.on_conflict_do_update(index_elements=["email"], ...)

email must have a unique index/constraint. Otherwise: ERROR.

3. Missing RETURNING when you need IDs

Bulk insert without RETURNING; you don’t know what was inserted. Add .returning(...).

4. Not committing

session.execute(insert(...), data) is just a queued op until commit.

5. COPY with bad data

Type mismatches halt the COPY mid-stream. Validate first or COPY into a staging table.

What’s next

Chapter 10: Replication, failover, pgvector.

Read this next


Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .