Redis caching patterns.

Cache-aside (most common)

def get_user(id):
    key = f"user:{id}"
    cached = redis.get(key)
    if cached:
        return json.loads(cached)
    
    user = db.user.find(id)
    if user:
        redis.setex(key, 300, json.dumps(user))
    return user

App manages cache. Use for most read-heavy data.

Invalidation

def update_user(id, data):
    db.user.update(id, data)
    redis.delete(f"user:{id}")

Or update cache directly (write-through).

Write-through

def update_user(id, data):
    db.user.update(id, data)
    redis.setex(f"user:{id}", 300, json.dumps(data))

Cache always fresh, slower writes.

Write-behind (write-back)

def update_user(id, data):
    redis.setex(f"user:{id}", 300, json.dumps(data))
    queue.enqueue("update_db", id, data)

Fast writes, eventual consistency.

Read-through

Wrapper auto-loads on miss:

class Cache:
    def get(self, key, loader, ttl=300):
        v = redis.get(key)
        if v: return json.loads(v)
        v = loader()
        redis.setex(key, ttl, json.dumps(v))
        return v

user = cache.get(f"user:{id}", lambda: db.user.find(id))

Cache stampede

When key expires, multiple requests refetch.

Lock-based

def get_with_lock(key, loader, ttl=300):
    v = redis.get(key)
    if v: return json.loads(v)
    
    lock_key = f"{key}:lock"
    if redis.set(lock_key, "1", nx=True, ex=30):
        try:
            v = loader()
            redis.setex(key, ttl, json.dumps(v))
        finally:
            redis.delete(lock_key)
        return v
    
    # Lock held; brief wait + retry
    time.sleep(0.05)
    return json.loads(redis.get(key) or "null") or loader()

Probabilistic early expiration

def get_pe(key, loader, ttl=300, beta=1.0):
    v_str = redis.get(key)
    if v_str:
        v = json.loads(v_str)
        rem = redis.ttl(key)
        now = time.time()
        if rem > 0 and (rem - beta * v["delta"] * math.log(random.random()) < 0):
            # Re-compute proactively
            new = loader()
            redis.setex(key, ttl, json.dumps(new))
            return new["data"]
        return v["data"]
    
    v = loader()
    redis.setex(key, ttl, json.dumps(v))
    return v

(Simplified XFetch.)

Soft TTL + background refresh

Store data + expires_at. If expires soon, refresh in background; return current.

Negative caching

Cache “not found”:

def get_user(id):
    key = f"user:{id}"
    v = redis.get(key)
    if v == "__NULL__": return None
    if v: return json.loads(v)
    
    user = db.user.find(id)
    if user is None:
        redis.setex(key, 60, "__NULL__")
        return None
    redis.setex(key, 300, json.dumps(user))
    return user

Prevents DB hammering for non-existent items.

Tag-based invalidation

def cache_with_tags(key, val, tags, ttl):
    redis.setex(key, ttl, val)
    for t in tags:
        redis.sadd(f"tag:{t}", key)
        redis.expire(f"tag:{t}", ttl)

def invalidate_tag(tag):
    keys = redis.smembers(f"tag:{tag}")
    if keys: redis.unlink(*keys)
    redis.delete(f"tag:{tag}")

TTL strategies

  • Short (5-60s): rapidly changing.
  • Medium (5-30min): typical.
  • Long (1d+): nearly static.

Add jitter to prevent thundering expiration:

ttl = base + random.randint(0, 60)

Cache size limits

maxmemory 2gb
maxmemory-policy allkeys-lru

Hot keys

Keys hit very frequently → bottleneck. Mitigate:

  • Local in-memory cache layer.
  • Multiple replica reads.
  • Pre-compute & serve from CDN.

Cluster keyslot

Same hash tag → same slot:

user:{1}:profile
user:{1}:sessions

Allows multi-key ops within same slot (transactions, lua scripts).

Common mistakes

  • No TTL → memory grows.
  • TTL too long → stale data.
  • Caching per-user data with shared key.
  • Forgetting to handle cache failure (fall back to DB).
  • Caching mutating in ways → stale cache.

Read this next

If you want my caching helper library, it’s at rajpoot.dev .


Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .