Redis-py cheatsheet.
Install
pip install redis
# Or with hiredis (C parser, faster):
pip install redis[hiredis]
Sync
import redis
r = redis.from_url("redis://localhost:6379/0")
r.set("k", "v")
r.get("k") # b"v"
# String decoding
r = redis.from_url("redis://localhost:6379/0", decode_responses=True)
r.get("k") # "v"
Connection pool
pool = redis.ConnectionPool.from_url("redis://localhost:6379", max_connections=50)
r = redis.Redis(connection_pool=pool)
Default pool is per-client. Reuse across requests.
Pipeline
pipe = r.pipeline()
for k in keys:
pipe.get(k)
results = pipe.execute()
Many ops in one RTT.
Transactions
with r.pipeline(transaction=True) as pipe:
pipe.multi()
pipe.set("a", 1)
pipe.set("b", 2)
pipe.execute()
WATCH (optimistic)
def transfer(from_acc, to_acc, amount):
while True:
try:
with r.pipeline() as pipe:
pipe.watch(from_acc, to_acc)
bal = int(pipe.get(from_acc) or 0)
if bal < amount:
pipe.unwatch()
raise ValueError("Insufficient")
pipe.multi()
pipe.decrby(from_acc, amount)
pipe.incrby(to_acc, amount)
pipe.execute()
return
except redis.WatchError:
continue
Async
import redis.asyncio as redis
r = redis.from_url("redis://localhost:6379")
await r.set("k", "v")
val = await r.get("k")
await r.aclose()
Async pool
pool = redis.ConnectionPool.from_url(url, max_connections=50)
r = redis.Redis(connection_pool=pool)
Per-event-loop pool.
Pub/sub
pubsub = r.pubsub()
pubsub.subscribe("channel")
for msg in pubsub.listen():
if msg["type"] == "message":
process(msg["data"])
Async:
async with r.pubsub() as ps:
await ps.subscribe("channel")
async for msg in ps.listen():
if msg["type"] == "message":
await process(msg["data"])
Hashes
r.hset("user:1", mapping={"name": "Alice", "age": 30})
r.hget("user:1", "name")
r.hgetall("user:1")
r.hincrby("user:1", "logins", 1)
SCAN
for key in r.scan_iter(match="user:*", count=1000):
print(key)
Async:
async for key in r.scan_iter(match="user:*"):
...
Lua
script = r.register_script("return redis.call('GET', KEYS[1])")
val = script(keys=["mykey"])
TTL
r.set("k", "v", ex=60)
r.set("k", "v", px=60000)
r.set("k", "v", nx=True, ex=60) # SET NX EX
r.expire("k", 60)
r.ttl("k")
JSON helper
import json
r.set("k", json.dumps(obj))
obj = json.loads(r.get("k"))
# With decode_responses=False, get returns bytes
Error handling
from redis.exceptions import ConnectionError, TimeoutError, RedisError
try:
r.get("k")
except ConnectionError:
...
except TimeoutError:
...
Cluster
from redis.cluster import RedisCluster
rc = RedisCluster(host="node1", port=6379)
rc.set("k", "v")
Sentinel
from redis.sentinel import Sentinel
sentinel = Sentinel([("s1", 26379), ("s2", 26379)])
master = sentinel.master_for("mymaster", decode_responses=True)
Cache decorator
import functools, json, hashlib
def cache(ttl=300):
def deco(fn):
@functools.wraps(fn)
def wrap(*args, **kw):
key = f"cache:{fn.__name__}:{hashlib.md5(str((args, kw)).encode()).hexdigest()}"
cached = r.get(key)
if cached: return json.loads(cached)
val = fn(*args, **kw)
r.setex(key, ttl, json.dumps(val))
return val
return wrap
return deco
@cache(ttl=60)
def expensive(x): return ...
Common mistakes
- Per-call connection (slow): use pool.
- decode_responses=False then
.decode()everywhere. - KEYS pattern (blocking) instead of SCAN.
- Forgetting to close async clients.
- Pickling untrusted data.
Read this next
If you want my redis-py wrapper, it’s at rajpoot.dev .
Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .