Django apps occasionally need real-time: live notifications, collaborative editing, dashboards that update without polling. Channels is the official answer. Used right, you get WebSockets, ASGI, and your Django models in one app. This post is the working playbook.
Setup
uv add channels channels-redis daphne
# settings.py
ASGI_APPLICATION = "myproject.asgi.application"
CHANNEL_LAYERS = {
"default": {
"BACKEND": "channels_redis.core.RedisChannelLayer",
"CONFIG": {"hosts": [("redis", 6379)]},
}
}
# asgi.py
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
from django.core.asgi import get_asgi_application
from . import routing
application = ProtocolTypeRouter({
"http": get_asgi_application(),
"websocket": AuthMiddlewareStack(URLRouter(routing.websocket_urlpatterns)),
})
Consumer
# chat/consumers.py
from channels.generic.websocket import AsyncJsonWebsocketConsumer
class ChatConsumer(AsyncJsonWebsocketConsumer):
async def connect(self):
if self.scope["user"].is_anonymous:
await self.close()
return
self.room = self.scope["url_route"]["kwargs"]["room_id"]
self.group = f"chat_{self.room}"
await self.channel_layer.group_add(self.group, self.channel_name)
await self.accept()
async def disconnect(self, code):
await self.channel_layer.group_discard(self.group, self.channel_name)
async def receive_json(self, content, **kwargs):
await self.channel_layer.group_send(self.group, {
"type": "chat.message",
"user": self.scope["user"].username,
"text": content["text"],
})
async def chat_message(self, event):
await self.send_json({"user": event["user"], "text": event["text"]})
# routing.py
from django.urls import re_path
from . import consumers
websocket_urlpatterns = [
re_path(r"^ws/chat/(?P<room_id>\d+)/$", consumers.ChatConsumer.as_asgi()),
]
Auth
AuthMiddlewareStack populates scope["user"] from session cookies. For tokens:
class TokenAuthMiddleware:
def __init__(self, app): self.app = app
async def __call__(self, scope, receive, send):
token = parse_token(scope)
scope["user"] = await get_user_for_token(token)
return await self.app(scope, receive, send)
For JWT-style auth , implement custom middleware.
Sending from views / tasks
Push to WebSocket clients from anywhere:
from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync
def notify_user(user_id, payload):
channel_layer = get_channel_layer()
async_to_sync(channel_layer.group_send)(
f"user_{user_id}",
{"type": "user.notification", "payload": payload},
)
From a Celery task
or sync view: same pattern. Async view: await channel_layer.group_send(...).
Scaling
[Load Balancer]
|
[Daphne 1] [Daphne 2] [Daphne 3] ← stateless WebSocket terminators
↓
[Redis channel layer] ← shared state
↑
[Worker A] [Worker B] ← Celery workers can push too
Run multiple Daphne / Uvicorn processes; Redis is the message bus between them. Connections distribute across processes; messages reach all subscribers via Redis.
For hyper-scale (>100k connections): consider scaling WebSockets and possibly a dedicated service.
Production server
daphne -b 0.0.0.0 -p 8000 myproject.asgi:application
# OR
uvicorn myproject.asgi:application --host 0.0.0.0 --port 8000 --workers 4
Behind nginx or an ALB with WebSocket support enabled.
Heartbeats
async def connect(self):
await self.accept()
self.heartbeat_task = asyncio.create_task(self._heartbeat())
async def _heartbeat(self):
try:
while True:
await asyncio.sleep(30)
await self.send_json({"type": "ping"})
except asyncio.CancelledError: pass
async def disconnect(self, code):
self.heartbeat_task.cancel()
LB / proxy idle timeouts disconnect quiet sockets. Heartbeat keeps them alive. Or rely on the client to ping.
Reconnect logic (client)
function connect() {
const ws = new WebSocket(url);
ws.onclose = () => setTimeout(connect, 1000 + Math.random() * 2000);
ws.onmessage = (evt) => handle(JSON.parse(evt.data));
return ws;
}
Exponential backoff with jitter. Server-side: tolerate reconnects; restore state from server-of-truth.
Common mistakes
1. Storing state in the consumer
A consumer instance lives only for one connection. State doesn’t survive reconnects; doesn’t share across consumers. Persist to DB or Redis.
2. Sync ORM in async consumer
async def receive_json(self, content):
msg = Message.objects.create(...) # BLOCKS the event loop
Wrap in database_sync_to_async:
from channels.db import database_sync_to_async
msg = await database_sync_to_async(Message.objects.create)(...)
Or use Django 5 async ORM: await Message.objects.acreate(...).
3. No reconnect
Client disconnects on flaky wifi; never reconnects; UI shows stale state. Always implement reconnect.
4. Trusting client identity
WebSocket auth happens at connect; subsequent messages aren’t re-authenticated. Bind permissions to scope["user"] and check on every message that has authority implications.
5. Single Daphne process
CPU-bound when the room hits a few hundred users. Run multiple processes behind a LB.
When to use Channels vs alternatives
| Choose when | |
|---|---|
| Django Channels | Django shop, real-time complements existing app |
| Phoenix Channels (Elixir) | Many connections, soft real-time, BEAM benefits |
| socket.io (Node) | Heavy real-time, fan-out, mature ecosystem |
| Centrifugo | Off-the-shelf message bus; auth via JWT; horizontal |
| Pusher / Ably | Managed; pay-per-connection |
For most Django teams: Channels is fine to ~10k–50k concurrent. Beyond that, consider a dedicated message bus.
Read this next
- Django 5: Async Views, ORM, Channels
- Django + Celery 2026
- Scaling WebSockets in 2026
- Authentication in 2026
If you want my Django Channels chat starter (auth, presence, scaling), it’s at rajpoot.dev .
Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .