Redis caching patterns.
Cache-aside (most common)
def get_user(id):
key = f"user:{id}"
cached = redis.get(key)
if cached:
return json.loads(cached)
user = db.user.find(id)
if user:
redis.setex(key, 300, json.dumps(user))
return user
App manages cache. Use for most read-heavy data.
Invalidation
def update_user(id, data):
db.user.update(id, data)
redis.delete(f"user:{id}")
Or update cache directly (write-through).
Write-through
def update_user(id, data):
db.user.update(id, data)
redis.setex(f"user:{id}", 300, json.dumps(data))
Cache always fresh, slower writes.
Write-behind (write-back)
def update_user(id, data):
redis.setex(f"user:{id}", 300, json.dumps(data))
queue.enqueue("update_db", id, data)
Fast writes, eventual consistency.
Read-through
Wrapper auto-loads on miss:
class Cache:
def get(self, key, loader, ttl=300):
v = redis.get(key)
if v: return json.loads(v)
v = loader()
redis.setex(key, ttl, json.dumps(v))
return v
user = cache.get(f"user:{id}", lambda: db.user.find(id))
Cache stampede
When key expires, multiple requests refetch.
Lock-based
def get_with_lock(key, loader, ttl=300):
v = redis.get(key)
if v: return json.loads(v)
lock_key = f"{key}:lock"
if redis.set(lock_key, "1", nx=True, ex=30):
try:
v = loader()
redis.setex(key, ttl, json.dumps(v))
finally:
redis.delete(lock_key)
return v
# Lock held; brief wait + retry
time.sleep(0.05)
return json.loads(redis.get(key) or "null") or loader()
Probabilistic early expiration
def get_pe(key, loader, ttl=300, beta=1.0):
v_str = redis.get(key)
if v_str:
v = json.loads(v_str)
rem = redis.ttl(key)
now = time.time()
if rem > 0 and (rem - beta * v["delta"] * math.log(random.random()) < 0):
# Re-compute proactively
new = loader()
redis.setex(key, ttl, json.dumps(new))
return new["data"]
return v["data"]
v = loader()
redis.setex(key, ttl, json.dumps(v))
return v
(Simplified XFetch.)
Soft TTL + background refresh
Store data + expires_at. If expires soon, refresh in background; return current.
Negative caching
Cache “not found”:
def get_user(id):
key = f"user:{id}"
v = redis.get(key)
if v == "__NULL__": return None
if v: return json.loads(v)
user = db.user.find(id)
if user is None:
redis.setex(key, 60, "__NULL__")
return None
redis.setex(key, 300, json.dumps(user))
return user
Prevents DB hammering for non-existent items.
Tag-based invalidation
def cache_with_tags(key, val, tags, ttl):
redis.setex(key, ttl, val)
for t in tags:
redis.sadd(f"tag:{t}", key)
redis.expire(f"tag:{t}", ttl)
def invalidate_tag(tag):
keys = redis.smembers(f"tag:{tag}")
if keys: redis.unlink(*keys)
redis.delete(f"tag:{tag}")
TTL strategies
- Short (5-60s): rapidly changing.
- Medium (5-30min): typical.
- Long (1d+): nearly static.
Add jitter to prevent thundering expiration:
ttl = base + random.randint(0, 60)
Cache size limits
maxmemory 2gb
maxmemory-policy allkeys-lru
Hot keys
Keys hit very frequently → bottleneck. Mitigate:
- Local in-memory cache layer.
- Multiple replica reads.
- Pre-compute & serve from CDN.
Cluster keyslot
Same hash tag → same slot:
user:{1}:profile
user:{1}:sessions
Allows multi-key ops within same slot (transactions, lua scripts).
Common mistakes
- No TTL → memory grows.
- TTL too long → stale data.
- Caching per-user data with shared key.
- Forgetting to handle cache failure (fall back to DB).
- Caching mutating in ways → stale cache.
Read this next
If you want my caching helper library, it’s at rajpoot.dev .
Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .