Django apps occasionally need real-time: live notifications, collaborative editing, dashboards that update without polling. Channels is the official answer. Used right, you get WebSockets, ASGI, and your Django models in one app. This post is the working playbook.

Setup

uv add channels channels-redis daphne
# settings.py
ASGI_APPLICATION = "myproject.asgi.application"
CHANNEL_LAYERS = {
    "default": {
        "BACKEND": "channels_redis.core.RedisChannelLayer",
        "CONFIG": {"hosts": [("redis", 6379)]},
    }
}
# asgi.py
from channels.routing import ProtocolTypeRouter, URLRouter
from channels.auth import AuthMiddlewareStack
from django.core.asgi import get_asgi_application
from . import routing

application = ProtocolTypeRouter({
    "http": get_asgi_application(),
    "websocket": AuthMiddlewareStack(URLRouter(routing.websocket_urlpatterns)),
})

Consumer

# chat/consumers.py
from channels.generic.websocket import AsyncJsonWebsocketConsumer

class ChatConsumer(AsyncJsonWebsocketConsumer):
    async def connect(self):
        if self.scope["user"].is_anonymous:
            await self.close()
            return
        self.room = self.scope["url_route"]["kwargs"]["room_id"]
        self.group = f"chat_{self.room}"
        await self.channel_layer.group_add(self.group, self.channel_name)
        await self.accept()
    
    async def disconnect(self, code):
        await self.channel_layer.group_discard(self.group, self.channel_name)
    
    async def receive_json(self, content, **kwargs):
        await self.channel_layer.group_send(self.group, {
            "type": "chat.message",
            "user": self.scope["user"].username,
            "text": content["text"],
        })
    
    async def chat_message(self, event):
        await self.send_json({"user": event["user"], "text": event["text"]})
# routing.py
from django.urls import re_path
from . import consumers

websocket_urlpatterns = [
    re_path(r"^ws/chat/(?P<room_id>\d+)/$", consumers.ChatConsumer.as_asgi()),
]

Auth

AuthMiddlewareStack populates scope["user"] from session cookies. For tokens:

class TokenAuthMiddleware:
    def __init__(self, app): self.app = app
    
    async def __call__(self, scope, receive, send):
        token = parse_token(scope)
        scope["user"] = await get_user_for_token(token)
        return await self.app(scope, receive, send)

For JWT-style auth , implement custom middleware.

Sending from views / tasks

Push to WebSocket clients from anywhere:

from channels.layers import get_channel_layer
from asgiref.sync import async_to_sync

def notify_user(user_id, payload):
    channel_layer = get_channel_layer()
    async_to_sync(channel_layer.group_send)(
        f"user_{user_id}",
        {"type": "user.notification", "payload": payload},
    )

From a Celery task or sync view: same pattern. Async view: await channel_layer.group_send(...).

Scaling

[Load Balancer]
   |
[Daphne 1] [Daphne 2] [Daphne 3]   ← stateless WebSocket terminators
       [Redis channel layer]       ← shared state
[Worker A]  [Worker B]              ← Celery workers can push too

Run multiple Daphne / Uvicorn processes; Redis is the message bus between them. Connections distribute across processes; messages reach all subscribers via Redis.

For hyper-scale (>100k connections): consider scaling WebSockets and possibly a dedicated service.

Production server

daphne -b 0.0.0.0 -p 8000 myproject.asgi:application
# OR
uvicorn myproject.asgi:application --host 0.0.0.0 --port 8000 --workers 4

Behind nginx or an ALB with WebSocket support enabled.

Heartbeats

async def connect(self):
    await self.accept()
    self.heartbeat_task = asyncio.create_task(self._heartbeat())

async def _heartbeat(self):
    try:
        while True:
            await asyncio.sleep(30)
            await self.send_json({"type": "ping"})
    except asyncio.CancelledError: pass

async def disconnect(self, code):
    self.heartbeat_task.cancel()

LB / proxy idle timeouts disconnect quiet sockets. Heartbeat keeps them alive. Or rely on the client to ping.

Reconnect logic (client)

function connect() {
  const ws = new WebSocket(url);
  ws.onclose = () => setTimeout(connect, 1000 + Math.random() * 2000);
  ws.onmessage = (evt) => handle(JSON.parse(evt.data));
  return ws;
}

Exponential backoff with jitter. Server-side: tolerate reconnects; restore state from server-of-truth.

Common mistakes

1. Storing state in the consumer

A consumer instance lives only for one connection. State doesn’t survive reconnects; doesn’t share across consumers. Persist to DB or Redis.

2. Sync ORM in async consumer

async def receive_json(self, content):
    msg = Message.objects.create(...)  # BLOCKS the event loop

Wrap in database_sync_to_async:

from channels.db import database_sync_to_async

msg = await database_sync_to_async(Message.objects.create)(...)

Or use Django 5 async ORM: await Message.objects.acreate(...).

3. No reconnect

Client disconnects on flaky wifi; never reconnects; UI shows stale state. Always implement reconnect.

4. Trusting client identity

WebSocket auth happens at connect; subsequent messages aren’t re-authenticated. Bind permissions to scope["user"] and check on every message that has authority implications.

5. Single Daphne process

CPU-bound when the room hits a few hundred users. Run multiple processes behind a LB.

When to use Channels vs alternatives

Choose when
Django ChannelsDjango shop, real-time complements existing app
Phoenix Channels (Elixir)Many connections, soft real-time, BEAM benefits
socket.io (Node)Heavy real-time, fan-out, mature ecosystem
CentrifugoOff-the-shelf message bus; auth via JWT; horizontal
Pusher / AblyManaged; pay-per-connection

For most Django teams: Channels is fine to ~10k–50k concurrent. Beyond that, consider a dedicated message bus.

Read this next

If you want my Django Channels chat starter (auth, presence, scaling), it’s at rajpoot.dev .


Building something AI-, backend-, or data-heavy and want a second pair of eyes? I do consulting and freelance work — see my projects and ways to reach me at rajpoot.dev .