Cheatsheet for SQLAlchemy events. Useful for cross-cutting concerns: logging, audit, validation, slow-query tracing.
Listen registration
from sqlalchemy import event
@event.listens_for(target, "event_name")
def handler(args): ...
Two ways to register:
# Decorator
@event.listens_for(engine.sync_engine, "connect")
def on_connect(dbapi_conn, conn_record): ...
# Direct
event.listen(engine.sync_engine, "connect", on_connect)
Engine events
connect
Runs when a new physical DB connection is opened.
@event.listens_for(engine.sync_engine, "connect")
def on_connect(dbapi_conn, conn_record):
cur = dbapi_conn.cursor()
cur.execute("SET TIME ZONE 'UTC'")
cur.execute("SET application_name = 'myapp'")
cur.close()
Use for per-connection config.
checkout / checkin
@event.listens_for(engine.sync_engine, "checkout")
def on_checkout(dbapi_conn, conn_record, conn_proxy):
# before connection returned from pool to caller
...
@event.listens_for(engine.sync_engine, "checkin")
def on_checkin(dbapi_conn, conn_record):
# after connection returned to pool
...
before / after cursor execute
@event.listens_for(engine.sync_engine, "before_cursor_execute")
def before(conn, cur, stmt, params, ctx, executemany):
ctx._t = time.time()
@event.listens_for(engine.sync_engine, "after_cursor_execute")
def after(conn, cur, stmt, params, ctx, executemany):
dur = time.time() - ctx._t
if dur > 0.5:
log.warning("slow_query", sql=stmt[:500], ms=dur*1000)
Use for tracing, slow-query log, metrics.
handle_error
@event.listens_for(engine.sync_engine, "handle_error")
def on_error(exception_context):
log.error("db_error", exc=str(exception_context.original_exception))
Session events
before_commit / after_commit
@event.listens_for(Session, "before_commit")
def before_commit(session):
# inspect session.new, session.dirty, session.deleted
for obj in session.new:
log.info("inserting", model=type(obj).__name__)
@event.listens_for(Session, "after_commit")
def after_commit(session):
# transaction committed; safe to enqueue post-commit work
for obj in getattr(session, "_pending_emit", []):
notify(obj)
before_flush / after_flush
@event.listens_for(Session, "before_flush")
def before_flush(session, flush_context, instances):
for obj in session.new:
...
do_orm_execute
@event.listens_for(Session, "do_orm_execute")
def hook(execute_state):
# inspect query before run; can modify
if execute_state.is_select:
...
Used for filtering policies (e.g., automatic soft-delete filter).
Mapper / instance events
before_insert / before_update / before_delete
from sqlalchemy.orm import Mapper
@event.listens_for(User, "before_insert")
def before_insert(mapper, connection, target):
target.created_at = datetime.utcnow()
@event.listens_for(User, "before_update")
def before_update(mapper, connection, target):
target.updated_at = datetime.utcnow()
@event.listens_for(User, "before_delete")
def before_delete(mapper, connection, target):
log.info("user_deleted", id=target.id)
Useful for default values that need Python (vs server_default which is DB-side).
after_insert / after_update / after_delete
@event.listens_for(User, "after_insert")
def after_insert(mapper, connection, target):
# target now has DB-assigned id
log.info("user_inserted", id=target.id)
Attribute events
from sqlalchemy.orm import attributes
@event.listens_for(User.email, "set")
def email_set(target, value, oldvalue, initiator):
if oldvalue and oldvalue != value:
log.info("email_changed", id=target.id, old=oldvalue, new=value)
return value.lower() if value else value
Conditional listening
@event.listens_for(Session, "after_commit")
def conditional(session):
if not getattr(session, "_audit", True):
return
...
Removing listeners
event.remove(target, "event_name", handler)
Propagation to subclasses
@event.listens_for(Base, "before_insert", propagate=True)
def all_before_insert(mapper, connection, target):
# fires for every model
...
propagate=True cascades to all subclasses.
Common use cases
Auto-timestamps
@event.listens_for(Mapper, "before_insert", propagate=True)
def auto_created_at(mapper, connection, target):
if hasattr(target, "created_at") and target.created_at is None:
target.created_at = datetime.utcnow()
Soft delete auto-filter
@event.listens_for(Session, "do_orm_execute")
def soft_delete_filter(execute_state):
if execute_state.is_select and not execute_state.execution_options.get("include_deleted"):
execute_state.statement = execute_state.statement.options(
with_loader_criteria(
SoftDeleteMixin, SoftDeleteMixin.deleted_at == None,
include_aliases=True,
)
)
Outbox emit on commit
@event.listens_for(Session, "after_commit")
def emit_outbox(session):
events = getattr(session, "_pending_outbox", [])
for e in events:
bus.publish(e.type, e.payload)
session._pending_outbox = []
Async events
Events run sync. On AsyncEngine, listen on engine.sync_engine:
@event.listens_for(engine.sync_engine, "connect")
def on_connect(dbapi_conn, conn_record):
...
For async work inside events (e.g., emit Kafka): schedule via asyncio.create_task outside the event, or buffer and emit in after_commit.
Common mistakes
- Async IO inside sync events — block the loop.
- Modifying session inside
before_commit— re-fires the flush; loops. - Not setting
propagate=Truefor mapper events on Base. - Heavy work inside
before_cursor_execute— adds to every query.
Read this next
If you want my soft-delete + audit + outbox event-suite, it’s at rajpoot.dev .
Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .