Django Channels cheatsheet.
Install
uv add channels channels-redis daphne
INSTALLED_APPS = ["daphne", ..., "channels"]
ASGI_APPLICATION = "config.asgi.application"
CHANNEL_LAYERS = {
"default": {
"BACKEND": "channels_redis.core.RedisChannelLayer",
"CONFIG": {"hosts": [("localhost", 6379)]},
},
}
asgi.py
import os
from django.core.asgi import get_asgi_application
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
from channels.security.websocket import AllowedHostsOriginValidator
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "config.settings")
from chat.routing import websocket_urlpatterns
application = ProtocolTypeRouter({
"http": get_asgi_application(),
"websocket": AllowedHostsOriginValidator(
AuthMiddlewareStack(URLRouter(websocket_urlpatterns))
),
})
Consumer
# chat/consumers.py
import json
from channels.generic.websocket import AsyncWebsocketConsumer
class ChatConsumer(AsyncWebsocketConsumer):
async def connect(self):
self.room = self.scope["url_route"]["kwargs"]["room"]
self.group_name = f"chat_{self.room}"
await self.channel_layer.group_add(self.group_name, self.channel_name)
await self.accept()
async def disconnect(self, code):
await self.channel_layer.group_discard(self.group_name, self.channel_name)
async def receive(self, text_data):
data = json.loads(text_data)
message = data["message"]
# Broadcast to group
await self.channel_layer.group_send(self.group_name, {
"type": "chat.message",
"message": message,
"user": self.scope["user"].username,
})
async def chat_message(self, event):
await self.send(text_data=json.dumps(event))
Routing
# chat/routing.py
from django.urls import re_path
from . import consumers
websocket_urlpatterns = [
re_path(r"^ws/chat/(?P<room>\w+)/$", consumers.ChatConsumer.as_asgi()),
]
Client
const ws = new WebSocket(`ws://${location.host}/ws/chat/general/`);
ws.onmessage = (e) => {
const data = JSON.parse(e.data);
console.log(data.user, ":", data.message);
};
ws.send(JSON.stringify({ message: "hi" }));
Auth in consumer
async def connect(self):
user = self.scope["user"]
if user.is_anonymous:
await self.close()
return
...
Run server
uv run daphne config.asgi:application -b 0.0.0.0 -p 8000
# or
uv run uvicorn config.asgi:application --host 0.0.0.0 --port 8000
In dev: runserver works (single-threaded). For prod, daphne or uvicorn.
Send from outside the consumer (e.g., from a view)
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
def notify_room(room, message):
layer = get_channel_layer()
async_to_sync(layer.group_send)(
f"chat_{room}",
{"type": "chat.message", "message": message},
)
# From async code
await layer.group_send(...)
JSON consumer
from channels.generic.websocket import AsyncJsonWebsocketConsumer
class ChatConsumer(AsyncJsonWebsocketConsumer):
async def receive_json(self, content):
# content is parsed JSON
...
async def chat_message(self, event):
await self.send_json(event)
Database access
from channels.db import database_sync_to_async
class MyConsumer(AsyncWebsocketConsumer):
@database_sync_to_async
def save_message(self, msg):
return Message.objects.create(text=msg)
async def receive(self, text_data):
await self.save_message(text_data)
Or async ORM directly (Django 5+):
async def receive(self, text_data):
await Message.objects.acreate(text=text_data)
Groups
# Join
await self.channel_layer.group_add("notifications", self.channel_name)
# Leave
await self.channel_layer.group_discard("notifications", self.channel_name)
# Send to group (all members)
await self.channel_layer.group_send("notifications", {"type": "notify", "msg": "hi"})
Custom event types
# In group_send:
await layer.group_send(group, {"type": "user.joined", "user": "alice"})
# Consumer method (use dots, not underscores; Channels maps):
async def user_joined(self, event):
await self.send_json(event)
Heartbeat / keepalive
import asyncio
async def connect(self):
await self.accept()
self.ping_task = asyncio.create_task(self.ping_loop())
async def ping_loop(self):
while True:
await asyncio.sleep(30)
await self.send(text_data='{"type":"ping"}')
async def disconnect(self, code):
self.ping_task.cancel()
Backpressure / rate limit
import time
class C(AsyncWebsocketConsumer):
async def connect(self):
await self.accept()
self.last_msg = 0
async def receive(self, text_data):
now = time.time()
if now - self.last_msg < 0.5:
await self.send(text_data='{"error":"rate-limited"}')
return
self.last_msg = now
...
Common mistakes
- Sync DB calls in async consumer → SynchronousOnlyOperation.
- Method name with dot used as Python identifier — use
_. - Forgetting
await self.accept()— connection drops. - Channel layer not configured →
group_sendsilently no-ops. - Running
runserverin prod → no WebSocket workers.
Read this next
If you want my Channels + Auth setup, it’s at rajpoot.dev .
Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .