Redis-py cheatsheet.

Install

pip install redis
# Or with hiredis (C parser, faster):
pip install redis[hiredis]

Sync

import redis

r = redis.from_url("redis://localhost:6379/0")
r.set("k", "v")
r.get("k")    # b"v"

# String decoding
r = redis.from_url("redis://localhost:6379/0", decode_responses=True)
r.get("k")    # "v"

Connection pool

pool = redis.ConnectionPool.from_url("redis://localhost:6379", max_connections=50)
r = redis.Redis(connection_pool=pool)

Default pool is per-client. Reuse across requests.

Pipeline

pipe = r.pipeline()
for k in keys:
    pipe.get(k)
results = pipe.execute()

Many ops in one RTT.

Transactions

with r.pipeline(transaction=True) as pipe:
    pipe.multi()
    pipe.set("a", 1)
    pipe.set("b", 2)
    pipe.execute()

WATCH (optimistic)

def transfer(from_acc, to_acc, amount):
    while True:
        try:
            with r.pipeline() as pipe:
                pipe.watch(from_acc, to_acc)
                bal = int(pipe.get(from_acc) or 0)
                if bal < amount:
                    pipe.unwatch()
                    raise ValueError("Insufficient")
                pipe.multi()
                pipe.decrby(from_acc, amount)
                pipe.incrby(to_acc, amount)
                pipe.execute()
                return
        except redis.WatchError:
            continue

Async

import redis.asyncio as redis

r = redis.from_url("redis://localhost:6379")

await r.set("k", "v")
val = await r.get("k")
await r.aclose()

Async pool

pool = redis.ConnectionPool.from_url(url, max_connections=50)
r = redis.Redis(connection_pool=pool)

Per-event-loop pool.

Pub/sub

pubsub = r.pubsub()
pubsub.subscribe("channel")
for msg in pubsub.listen():
    if msg["type"] == "message":
        process(msg["data"])

Async:

async with r.pubsub() as ps:
    await ps.subscribe("channel")
    async for msg in ps.listen():
        if msg["type"] == "message":
            await process(msg["data"])

Hashes

r.hset("user:1", mapping={"name": "Alice", "age": 30})
r.hget("user:1", "name")
r.hgetall("user:1")
r.hincrby("user:1", "logins", 1)

SCAN

for key in r.scan_iter(match="user:*", count=1000):
    print(key)

Async:

async for key in r.scan_iter(match="user:*"):
    ...

Lua

script = r.register_script("return redis.call('GET', KEYS[1])")
val = script(keys=["mykey"])

TTL

r.set("k", "v", ex=60)
r.set("k", "v", px=60000)
r.set("k", "v", nx=True, ex=60)        # SET NX EX
r.expire("k", 60)
r.ttl("k")

JSON helper

import json

r.set("k", json.dumps(obj))
obj = json.loads(r.get("k"))

# With decode_responses=False, get returns bytes

Error handling

from redis.exceptions import ConnectionError, TimeoutError, RedisError

try:
    r.get("k")
except ConnectionError:
    ...
except TimeoutError:
    ...

Cluster

from redis.cluster import RedisCluster

rc = RedisCluster(host="node1", port=6379)
rc.set("k", "v")

Sentinel

from redis.sentinel import Sentinel

sentinel = Sentinel([("s1", 26379), ("s2", 26379)])
master = sentinel.master_for("mymaster", decode_responses=True)

Cache decorator

import functools, json, hashlib

def cache(ttl=300):
    def deco(fn):
        @functools.wraps(fn)
        def wrap(*args, **kw):
            key = f"cache:{fn.__name__}:{hashlib.md5(str((args, kw)).encode()).hexdigest()}"
            cached = r.get(key)
            if cached: return json.loads(cached)
            val = fn(*args, **kw)
            r.setex(key, ttl, json.dumps(val))
            return val
        return wrap
    return deco

@cache(ttl=60)
def expensive(x): return ...

Common mistakes

  • Per-call connection (slow): use pool.
  • decode_responses=False then .decode() everywhere.
  • KEYS pattern (blocking) instead of SCAN.
  • Forgetting to close async clients.
  • Pickling untrusted data.

Read this next

If you want my redis-py wrapper, it’s at rajpoot.dev .


Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .