Chapter 9: Postgres-specific bulk operations. Upsert via ON CONFLICT, bulk insert, and COPY for serious data loads.
ON CONFLICT (upsert)
from sqlalchemy.dialects.postgresql import insert as pg_insert
stmt = pg_insert(User).values(email="[email protected]", name="Alice")
stmt = stmt.on_conflict_do_update(
index_elements=["email"],
set_={"name": stmt.excluded.name, "updated_at": func.now()},
)
await session.execute(stmt)
stmt.excluded references the row that would have been inserted.
ON CONFLICT DO NOTHING
stmt = pg_insert(User).values(email="[email protected]", name="Alice")
stmt = stmt.on_conflict_do_nothing(index_elements=["email"])
Skip duplicates silently.
RETURNING
stmt = pg_insert(User).values(email="[email protected]").returning(User.id, User.email)
result = await session.execute(stmt)
rows = result.all()
Get back inserted rows. Combined with ON CONFLICT:
stmt = (
pg_insert(User)
.values(email="[email protected]", name="Alice")
.on_conflict_do_update(index_elements=["email"], set_={"name": "Alice"})
.returning(User.id)
)
Returns the row whether inserted or updated.
Bulk insert
data = [
{"email": "[email protected]", "name": "Alice"},
{"email": "[email protected]", "name": "Bob"},
# ... thousands
]
stmt = insert(User)
await session.execute(stmt, data)
await session.commit()
Single INSERT with many rows. SQLAlchemy batches efficiently.
Bulk upsert
stmt = pg_insert(User)
stmt = stmt.on_conflict_do_update(index_elements=["email"], set_={"name": stmt.excluded.name})
await session.execute(stmt, data)
Same pattern; works at bulk scale.
bulk_insert_mappings (older but still works)
session.bulk_insert_mappings(User, data)
await session.commit()
Legacy API; the modern pattern is execute(insert(...), data).
COPY (asyncpg)
For really large loads: COPY is dramatically faster than INSERT.
async with engine.begin() as conn:
raw = await conn.get_raw_connection()
asyncpg_conn = raw.driver_connection
await asyncpg_conn.copy_records_to_table(
"users",
records=[(email, name) for email, name in data],
columns=["email", "name"],
)
Drops to asyncpg directly. 5–10× faster than INSERT for millions of rows.
For psycopg v3:
async with engine.connect() as conn:
raw = await conn.get_raw_connection()
psycopg_conn = raw.driver_connection
async with psycopg_conn.cursor() as cur:
async with cur.copy("COPY users (email, name) FROM STDIN") as copy:
for email, name in data:
await copy.write_row((email, name))
COPY from CSV
async with asyncpg_conn.transaction():
with open("users.csv") as f:
await asyncpg_conn.copy_to_table("users", source=f, columns=["email", "name"], format="csv", header=True)
For loading CSVs.
Performance comparison
For 100k rows on a moderate setup:
- ORM session.add loop: ~30s.
- session.execute(insert, list): ~2s.
- COPY: ~0.5s.
Use COPY for ETL / bulk loads. Use bulk insert for normal app paths.
Streaming generators
For huge loads that don’t fit in memory:
async def read_records():
async with aiofiles.open("data.csv") as f:
async for line in f:
yield parse(line)
async with asyncpg_conn.transaction():
await asyncpg_conn.copy_records_to_table("users", records=read_records(), columns=["email", "name"])
asyncpg streams records; bounded memory.
Conflict on multiple constraints
stmt = pg_insert(Item).values(...).on_conflict_do_update(
constraint="ix_items_unique_compound",
set_={"updated_at": func.now()},
)
Reference by constraint name when multiple unique constraints exist.
Conditional update
stmt = pg_insert(User).values(email="[email protected]", name="Alice")
stmt = stmt.on_conflict_do_update(
index_elements=["email"],
set_={"name": stmt.excluded.name},
where=User.updated_at < stmt.excluded.updated_at,
)
Only update if the new row is newer. “Last writer wins” with timestamp.
Chunked bulk
For very large lists, chunk to avoid huge statements:
CHUNK = 1000
for i in range(0, len(data), CHUNK):
await session.execute(insert(User), data[i:i+CHUNK])
await session.commit()
Alternatively, COPY handles arbitrary size.
Common mistakes
1. ORM loop for bulk
10k iterations × ORM overhead = slow. Use bulk insert.
2. ON CONFLICT without index
stmt.on_conflict_do_update(index_elements=["email"], ...)
email must have a unique index/constraint. Otherwise: ERROR.
3. Missing RETURNING when you need IDs
Bulk insert without RETURNING; you don’t know what was inserted. Add .returning(...).
4. Not committing
session.execute(insert(...), data) is just a queued op until commit.
5. COPY with bad data
Type mismatches halt the COPY mid-stream. Validate first or COPY into a staging table.
What’s next
Chapter 10: Replication, failover, pgvector.
Read this next
Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .