Pub/sub vs Streams.

Pub/sub (fire-and-forget)

# Publisher
redis.publish("channel", json.dumps({"event": "x"}))

# Subscriber
pubsub = redis.pubsub()
pubsub.subscribe("channel")
for msg in pubsub.listen():
    if msg["type"] == "message":
        data = json.loads(msg["data"])

Pattern:

pubsub.psubscribe("news.*")

No persistence. Subscribers must be connected. Use for ephemeral notifications.

Streams (durable, replayable)

# Producer
redis.xadd("events", {"type": "signup", "user_id": 42})
redis.xadd("events", {"type": "click", "url": "/"}, maxlen=100000)

# Consumer (simple)
last = "$"
while True:
    res = redis.xread({"events": last}, block=5000, count=10)
    for stream, msgs in res:
        for msg_id, data in msgs:
            process(data)
            last = msg_id

Survives restarts. Multiple consumers can read.

Consumer groups

redis.xgroup_create("events", "workers", id="0", mkstream=True)

# Worker
while True:
    res = redis.xreadgroup("workers", "worker-1", {"events": ">"}, count=10, block=5000)
    for stream, msgs in res:
        for msg_id, data in msgs:
            try:
                process(data)
                redis.xack("events", "workers", msg_id)
            except Exception:
                pass

Load-balanced. Each message goes to one consumer.

Pending entries (failures)

pending = redis.xpending_range("events", "workers", "-", "+", 100)
# Re-claim if owner idle too long
redis.xclaim("events", "workers", "new_consumer", min_idle_time=60000, message_ids=[...])

Trim stream

XADD events MAXLEN 100000 * key v
XADD events MAXLEN ~ 100000 * key v       # approximate (faster)
XTRIM events MAXLEN 100000
XTRIM events MINID 1700000000-0

Dead letter

def process_with_retry(msg_id, data, max_attempts=3):
    attempts = int(data.get("attempts", 0))
    if attempts >= max_attempts:
        redis.xadd("events:dead", data)
        redis.xack("events", "workers", msg_id)
        return
    try:
        do_work(data)
        redis.xack(...)
    except:
        data["attempts"] = str(attempts + 1)
        redis.xadd("events", data)
        redis.xack("events", "workers", msg_id)

When to use which

Pub/sub:

  • Chat / notifications.
  • Cache invalidation broadcasts.
  • Don’t care about lost messages.

Streams:

  • Event sourcing.
  • Reliable job queue.
  • Replay needed.
  • Multiple independent consumers.

Latency

Pub/sub: very low. Streams: low + persistence overhead.

Memory

Streams persist until trimmed. Pub/sub: ephemeral.

Common mistakes

  • Pub/sub for important events (lost on disconnect).
  • Stream without MAXLEN → infinite growth.
  • No XACK → pending pile up.
  • Single consumer in group → no load balance.
  • Mixing pub/sub patterns + cluster (sharded by channel).

Read this next

If you want my stream worker template, it’s at rajpoot.dev .


Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .