Cheatsheet for fast inserts / upserts / RETURNING. Long-form: Postgres textbook Ch 9 .
ON CONFLICT DO UPDATE
from sqlalchemy.dialects.postgresql import insert as pg_insert
stmt = pg_insert(User).values(email="[email protected]", name="Alice")
stmt = stmt.on_conflict_do_update(
index_elements=["email"],
set_={
"name": stmt.excluded.name,
"updated_at": func.now(),
},
)
await session.execute(stmt)
stmt.excluded = the row that would have been inserted.
ON CONFLICT DO NOTHING
stmt = pg_insert(User).values(email="[email protected]", name="Alice")
stmt = stmt.on_conflict_do_nothing(index_elements=["email"])
await session.execute(stmt)
Silently skip duplicates.
ON CONFLICT by constraint name
stmt = pg_insert(Item).values(...)
stmt = stmt.on_conflict_do_update(
constraint="ix_items_unique_compound",
set_={"updated_at": func.now()},
)
For multi-column unique constraints, reference by name.
RETURNING
stmt = pg_insert(User).values(email="[email protected]").returning(User.id, User.email)
result = await session.execute(stmt)
rows = result.all()
print(rows[0].id, rows[0].email)
Postgres returns the affected rows. Combined with upsert:
stmt = (
pg_insert(User)
.values(email="[email protected]", name="Alice")
.on_conflict_do_update(index_elements=["email"], set_={"name": "Alice"})
.returning(User.id)
)
Bulk insert
data = [
{"email": "[email protected]", "name": "Alice"},
{"email": "[email protected]", "name": "Bob"},
# ...
]
stmt = insert(User)
await session.execute(stmt, data)
await session.commit()
Single INSERT statement with multi-row VALUES. Far faster than per-row session.add.
Bulk upsert
stmt = pg_insert(User)
stmt = stmt.on_conflict_do_update(
index_elements=["email"],
set_={"name": stmt.excluded.name},
)
await session.execute(stmt, data)
Works at bulk scale.
Bulk insert with returning
stmt = pg_insert(User).returning(User.id, User.email)
result = await session.execute(stmt, data)
new_ids = [r.id for r in result]
Conditional upsert
stmt = pg_insert(User).values(email="[email protected]", name="Alice", updated_at=func.now())
stmt = stmt.on_conflict_do_update(
index_elements=["email"],
set_={"name": stmt.excluded.name},
where=User.updated_at < stmt.excluded.updated_at,
)
Only update if the new row is newer (last-writer-wins by timestamp).
COPY via asyncpg (huge loads)
async def copy_users(records):
async with engine.begin() as conn:
raw = await conn.get_raw_connection()
ap = raw.driver_connection # asyncpg connection
await ap.copy_records_to_table(
"users",
records=records, # list[tuple]
columns=["email", "name"],
)
5-10× faster than INSERT for millions of rows.
COPY from CSV file
async with engine.begin() as conn:
raw = await conn.get_raw_connection()
ap = raw.driver_connection
with open("users.csv") as f:
await ap.copy_to_table(
"users",
source=f,
columns=["email", "name"],
format="csv",
header=True,
)
COPY via psycopg v3
async with engine.connect() as conn:
raw = await conn.get_raw_connection()
pc = raw.driver_connection
async with pc.cursor() as cur:
async with cur.copy("COPY users (email, name) FROM STDIN") as copy:
for email, name in data:
await copy.write_row((email, name))
Streaming generator
async def gen_records():
async with aiofiles.open("data.csv") as f:
async for line in f:
yield parse_line(line)
await ap.copy_records_to_table("users", records=gen_records(), columns=["email", "name"])
asyncpg streams; memory bounded.
Chunked bulk insert
CHUNK = 1000
for i in range(0, len(data), CHUNK):
await session.execute(insert(User), data[i:i+CHUNK])
await session.commit()
For very large lists, chunk to avoid huge SQL statements.
Staging table pattern
For complex transforms:
CREATE TEMP TABLE users_staging (LIKE users);
-- COPY into staging
COPY users_staging FROM STDIN;
INSERT INTO users (email, name)
SELECT lower(email), name FROM users_staging
ON CONFLICT (email) DO UPDATE SET name = excluded.name;
Performance comparison
For 100k rows:
| Approach | Time |
|---|---|
ORM session.add loop | ~30s |
session.execute(insert(...), list) | ~2s |
| asyncpg COPY | ~0.5s |
Choose by scale.
Common mistakes
on_conflict_do_update(index_elements=["email"])without unique constraint on email → ERROR.- Missing RETURNING when you need IDs.
- Per-row INSERTs in loop — slow.
- Bad data in COPY halts mid-stream — validate first.
- Forgetting
commit()after bulk operations.
Read this next
If you want my COPY + upsert + staging pattern, it’s at rajpoot.dev .
Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .