Cheatsheet for fast inserts / upserts / RETURNING. Long-form: Postgres textbook Ch 9 .

ON CONFLICT DO UPDATE

from sqlalchemy.dialects.postgresql import insert as pg_insert

stmt = pg_insert(User).values(email="[email protected]", name="Alice")
stmt = stmt.on_conflict_do_update(
    index_elements=["email"],
    set_={
        "name": stmt.excluded.name,
        "updated_at": func.now(),
    },
)
await session.execute(stmt)

stmt.excluded = the row that would have been inserted.

ON CONFLICT DO NOTHING

stmt = pg_insert(User).values(email="[email protected]", name="Alice")
stmt = stmt.on_conflict_do_nothing(index_elements=["email"])
await session.execute(stmt)

Silently skip duplicates.

ON CONFLICT by constraint name

stmt = pg_insert(Item).values(...)
stmt = stmt.on_conflict_do_update(
    constraint="ix_items_unique_compound",
    set_={"updated_at": func.now()},
)

For multi-column unique constraints, reference by name.

RETURNING

stmt = pg_insert(User).values(email="[email protected]").returning(User.id, User.email)
result = await session.execute(stmt)
rows = result.all()
print(rows[0].id, rows[0].email)

Postgres returns the affected rows. Combined with upsert:

stmt = (
    pg_insert(User)
    .values(email="[email protected]", name="Alice")
    .on_conflict_do_update(index_elements=["email"], set_={"name": "Alice"})
    .returning(User.id)
)

Bulk insert

data = [
    {"email": "[email protected]", "name": "Alice"},
    {"email": "[email protected]", "name": "Bob"},
    # ...
]

stmt = insert(User)
await session.execute(stmt, data)
await session.commit()

Single INSERT statement with multi-row VALUES. Far faster than per-row session.add.

Bulk upsert

stmt = pg_insert(User)
stmt = stmt.on_conflict_do_update(
    index_elements=["email"],
    set_={"name": stmt.excluded.name},
)
await session.execute(stmt, data)

Works at bulk scale.

Bulk insert with returning

stmt = pg_insert(User).returning(User.id, User.email)
result = await session.execute(stmt, data)
new_ids = [r.id for r in result]

Conditional upsert

stmt = pg_insert(User).values(email="[email protected]", name="Alice", updated_at=func.now())
stmt = stmt.on_conflict_do_update(
    index_elements=["email"],
    set_={"name": stmt.excluded.name},
    where=User.updated_at < stmt.excluded.updated_at,
)

Only update if the new row is newer (last-writer-wins by timestamp).

COPY via asyncpg (huge loads)

async def copy_users(records):
    async with engine.begin() as conn:
        raw = await conn.get_raw_connection()
        ap = raw.driver_connection                  # asyncpg connection
        await ap.copy_records_to_table(
            "users",
            records=records,                        # list[tuple]
            columns=["email", "name"],
        )

5-10× faster than INSERT for millions of rows.

COPY from CSV file

async with engine.begin() as conn:
    raw = await conn.get_raw_connection()
    ap = raw.driver_connection
    with open("users.csv") as f:
        await ap.copy_to_table(
            "users",
            source=f,
            columns=["email", "name"],
            format="csv",
            header=True,
        )

COPY via psycopg v3

async with engine.connect() as conn:
    raw = await conn.get_raw_connection()
    pc = raw.driver_connection
    async with pc.cursor() as cur:
        async with cur.copy("COPY users (email, name) FROM STDIN") as copy:
            for email, name in data:
                await copy.write_row((email, name))

Streaming generator

async def gen_records():
    async with aiofiles.open("data.csv") as f:
        async for line in f:
            yield parse_line(line)

await ap.copy_records_to_table("users", records=gen_records(), columns=["email", "name"])

asyncpg streams; memory bounded.

Chunked bulk insert

CHUNK = 1000
for i in range(0, len(data), CHUNK):
    await session.execute(insert(User), data[i:i+CHUNK])
await session.commit()

For very large lists, chunk to avoid huge SQL statements.

Staging table pattern

For complex transforms:

CREATE TEMP TABLE users_staging (LIKE users);
-- COPY into staging
COPY users_staging FROM STDIN;

INSERT INTO users (email, name)
SELECT lower(email), name FROM users_staging
ON CONFLICT (email) DO UPDATE SET name = excluded.name;

Performance comparison

For 100k rows:

ApproachTime
ORM session.add loop~30s
session.execute(insert(...), list)~2s
asyncpg COPY~0.5s

Choose by scale.

Common mistakes

  • on_conflict_do_update(index_elements=["email"]) without unique constraint on email → ERROR.
  • Missing RETURNING when you need IDs.
  • Per-row INSERTs in loop — slow.
  • Bad data in COPY halts mid-stream — validate first.
  • Forgetting commit() after bulk operations.

Read this next

If you want my COPY + upsert + staging pattern, it’s at rajpoot.dev .


Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .