Cheatsheet for SQLAlchemy events. Useful for cross-cutting concerns: logging, audit, validation, slow-query tracing.

Listen registration

from sqlalchemy import event

@event.listens_for(target, "event_name")
def handler(args): ...

Two ways to register:

# Decorator
@event.listens_for(engine.sync_engine, "connect")
def on_connect(dbapi_conn, conn_record): ...

# Direct
event.listen(engine.sync_engine, "connect", on_connect)

Engine events

connect

Runs when a new physical DB connection is opened.

@event.listens_for(engine.sync_engine, "connect")
def on_connect(dbapi_conn, conn_record):
    cur = dbapi_conn.cursor()
    cur.execute("SET TIME ZONE 'UTC'")
    cur.execute("SET application_name = 'myapp'")
    cur.close()

Use for per-connection config.

checkout / checkin

@event.listens_for(engine.sync_engine, "checkout")
def on_checkout(dbapi_conn, conn_record, conn_proxy):
    # before connection returned from pool to caller
    ...

@event.listens_for(engine.sync_engine, "checkin")
def on_checkin(dbapi_conn, conn_record):
    # after connection returned to pool
    ...

before / after cursor execute

@event.listens_for(engine.sync_engine, "before_cursor_execute")
def before(conn, cur, stmt, params, ctx, executemany):
    ctx._t = time.time()

@event.listens_for(engine.sync_engine, "after_cursor_execute")
def after(conn, cur, stmt, params, ctx, executemany):
    dur = time.time() - ctx._t
    if dur > 0.5:
        log.warning("slow_query", sql=stmt[:500], ms=dur*1000)

Use for tracing, slow-query log, metrics.

handle_error

@event.listens_for(engine.sync_engine, "handle_error")
def on_error(exception_context):
    log.error("db_error", exc=str(exception_context.original_exception))

Session events

before_commit / after_commit

@event.listens_for(Session, "before_commit")
def before_commit(session):
    # inspect session.new, session.dirty, session.deleted
    for obj in session.new:
        log.info("inserting", model=type(obj).__name__)

@event.listens_for(Session, "after_commit")
def after_commit(session):
    # transaction committed; safe to enqueue post-commit work
    for obj in getattr(session, "_pending_emit", []):
        notify(obj)

before_flush / after_flush

@event.listens_for(Session, "before_flush")
def before_flush(session, flush_context, instances):
    for obj in session.new:
        ...

do_orm_execute

@event.listens_for(Session, "do_orm_execute")
def hook(execute_state):
    # inspect query before run; can modify
    if execute_state.is_select:
        ...

Used for filtering policies (e.g., automatic soft-delete filter).

Mapper / instance events

before_insert / before_update / before_delete

from sqlalchemy.orm import Mapper

@event.listens_for(User, "before_insert")
def before_insert(mapper, connection, target):
    target.created_at = datetime.utcnow()

@event.listens_for(User, "before_update")
def before_update(mapper, connection, target):
    target.updated_at = datetime.utcnow()

@event.listens_for(User, "before_delete")
def before_delete(mapper, connection, target):
    log.info("user_deleted", id=target.id)

Useful for default values that need Python (vs server_default which is DB-side).

after_insert / after_update / after_delete

@event.listens_for(User, "after_insert")
def after_insert(mapper, connection, target):
    # target now has DB-assigned id
    log.info("user_inserted", id=target.id)

Attribute events

from sqlalchemy.orm import attributes

@event.listens_for(User.email, "set")
def email_set(target, value, oldvalue, initiator):
    if oldvalue and oldvalue != value:
        log.info("email_changed", id=target.id, old=oldvalue, new=value)
    return value.lower() if value else value

Conditional listening

@event.listens_for(Session, "after_commit")
def conditional(session):
    if not getattr(session, "_audit", True):
        return
    ...

Removing listeners

event.remove(target, "event_name", handler)

Propagation to subclasses

@event.listens_for(Base, "before_insert", propagate=True)
def all_before_insert(mapper, connection, target):
    # fires for every model
    ...

propagate=True cascades to all subclasses.

Common use cases

Auto-timestamps

@event.listens_for(Mapper, "before_insert", propagate=True)
def auto_created_at(mapper, connection, target):
    if hasattr(target, "created_at") and target.created_at is None:
        target.created_at = datetime.utcnow()

Soft delete auto-filter

@event.listens_for(Session, "do_orm_execute")
def soft_delete_filter(execute_state):
    if execute_state.is_select and not execute_state.execution_options.get("include_deleted"):
        execute_state.statement = execute_state.statement.options(
            with_loader_criteria(
                SoftDeleteMixin, SoftDeleteMixin.deleted_at == None,
                include_aliases=True,
            )
        )

Outbox emit on commit

@event.listens_for(Session, "after_commit")
def emit_outbox(session):
    events = getattr(session, "_pending_outbox", [])
    for e in events:
        bus.publish(e.type, e.payload)
    session._pending_outbox = []

Async events

Events run sync. On AsyncEngine, listen on engine.sync_engine:

@event.listens_for(engine.sync_engine, "connect")
def on_connect(dbapi_conn, conn_record):
    ...

For async work inside events (e.g., emit Kafka): schedule via asyncio.create_task outside the event, or buffer and emit in after_commit.

Common mistakes

  • Async IO inside sync events — block the loop.
  • Modifying session inside before_commit — re-fires the flush; loops.
  • Not setting propagate=True for mapper events on Base.
  • Heavy work inside before_cursor_execute — adds to every query.

Read this next

If you want my soft-delete + audit + outbox event-suite, it’s at rajpoot.dev .


Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .