Django Channels cheatsheet.

Install

uv add channels channels-redis daphne
INSTALLED_APPS = ["daphne", ..., "channels"]
ASGI_APPLICATION = "config.asgi.application"

CHANNEL_LAYERS = {
    "default": {
        "BACKEND": "channels_redis.core.RedisChannelLayer",
        "CONFIG": {"hosts": [("localhost", 6379)]},
    },
}

asgi.py

import os
from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
from channels.security.websocket import AllowedHostsOriginValidator

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings")

from chat.routing import websocket_urlpatterns

application = ProtocolTypeRouter({
    "http": get_asgi_application(),
    "websocket": AllowedHostsOriginValidator(
        AuthMiddlewareStack(URLRouter(websocket_urlpatterns))
    ),
})

Consumer

# chat/consumers.py
import json
from channels.generic.websocket import AsyncWebsocketConsumer

class ChatConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        self.room = self.scope["url_route"]["kwargs"]["room"]
        self.group_name = f"chat_{self.room}"
        
        await self.channel_layer.group_add(self.group_name, self.channel_name)
        await self.accept()
    
    async def disconnect(self, code):
        await self.channel_layer.group_discard(self.group_name, self.channel_name)
    
    async def receive(self, text_data):
        data = json.loads(text_data)
        message = data["message"]
        
        # Broadcast to group
        await self.channel_layer.group_send(self.group_name, {
            "type": "chat.message",
            "message": message,
            "user": self.scope["user"].username,
        })
    
    async def chat_message(self, event):
        await self.send(text_data=json.dumps(event))

Routing

# chat/routing.py
from django.urls import re_path
from . import consumers

websocket_urlpatterns = [
    re_path(r"^ws/chat/(?P<room>\w+)/$", consumers.ChatConsumer.as_asgi()),
]

Client

const ws = new WebSocket(`ws://${location.host}/ws/chat/general/`);

ws.onmessage = (e) => {
  const data = JSON.parse(e.data);
  console.log(data.user, ":", data.message);
};

ws.send(JSON.stringify({ message: "hi" }));

Auth in consumer

async def connect(self):
    user = self.scope["user"]
    if user.is_anonymous:
        await self.close()
        return
    ...

Run server

uv run daphne config.asgi:application -b 0.0.0.0 -p 8000
# or
uv run uvicorn config.asgi:application --host 0.0.0.0 --port 8000

In dev: runserver works (single-threaded). For prod, daphne or uvicorn.

Send from outside the consumer (e.g., from a view)

from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync

def notify_room(room, message):
    layer = get_channel_layer()
    async_to_sync(layer.group_send)(
        f"chat_{room}",
        {"type": "chat.message", "message": message},
    )
# From async code
await layer.group_send(...)

JSON consumer

from channels.generic.websocket import AsyncJsonWebsocketConsumer

class ChatConsumer(AsyncJsonWebsocketConsumer):
    async def receive_json(self, content):
        # content is parsed JSON
        ...
    
    async def chat_message(self, event):
        await self.send_json(event)

Database access

from channels.db import database_sync_to_async

class MyConsumer(AsyncWebsocketConsumer):
    @database_sync_to_async
    def save_message(self, msg):
        return Message.objects.create(text=msg)
    
    async def receive(self, text_data):
        await self.save_message(text_data)

Or async ORM directly (Django 5+):

async def receive(self, text_data):
    await Message.objects.acreate(text=text_data)

Groups

# Join
await self.channel_layer.group_add("notifications", self.channel_name)

# Leave
await self.channel_layer.group_discard("notifications", self.channel_name)

# Send to group (all members)
await self.channel_layer.group_send("notifications", {"type": "notify", "msg": "hi"})

Custom event types

# In group_send:
await layer.group_send(group, {"type": "user.joined", "user": "alice"})

# Consumer method (use dots, not underscores; Channels maps):
async def user_joined(self, event):
    await self.send_json(event)

Heartbeat / keepalive

import asyncio

async def connect(self):
    await self.accept()
    self.ping_task = asyncio.create_task(self.ping_loop())

async def ping_loop(self):
    while True:
        await asyncio.sleep(30)
        await self.send(text_data='{"type":"ping"}')

async def disconnect(self, code):
    self.ping_task.cancel()

Backpressure / rate limit

import time

class C(AsyncWebsocketConsumer):
    async def connect(self):
        await self.accept()
        self.last_msg = 0
    
    async def receive(self, text_data):
        now = time.time()
        if now - self.last_msg < 0.5:
            await self.send(text_data='{"error":"rate-limited"}')
            return
        self.last_msg = now
        ...

Common mistakes

  • Sync DB calls in async consumer → SynchronousOnlyOperation.
  • Method name with dot used as Python identifier — use _.
  • Forgetting await self.accept() — connection drops.
  • Channel layer not configured → group_send silently no-ops.
  • Running runserver in prod → no WebSocket workers.

Read this next

If you want my Channels + Auth setup, it’s at rajpoot.dev .


Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .