Приветствую, друзья!

Когда я впервые начал работать с Django, меня всё устраивало, за исключением одного момента: как сделать так, чтобы приложение могло общаться с пользователем в реальном времени? Веб-сокеты, уведомления, асинхронные запросы — казалось, это точно не про чистый Django. Но затем я наткнулся на Django Channels, и многое изменилось. Channels позволили мне сделать приложение асинхронным, добавить поддержку веб-сокетов и превратить его во что-то гораздо более крутое.

В этой статье я расскажу, как работать с Django Channels.

Установка

Первым делом установим необходимые пакеты:

pip install channels

Далее обновим settings.py проекта:

# settings.py

INSTALLED_APPS = [
    # ...
    'channels',
    # ...
]

ASGI_APPLICATION = 'myproject.asgi.application'

Создадим файл asgi.py в корневой директории проекта:

# asgi.py

import os
from channels.routing import ProtocolTypeRouter, URLRouter
from django.core.asgi import get_asgi_application
from channels.auth import AuthMiddlewareStack
import chat.routing

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')

application = ProtocolTypeRouter({
    "http": get_asgi_application(),
    "websocket": AuthMiddlewareStack(
        URLRouter(
            chat.routing.websocket_urlpatterns
        )
    ),
})

Немного про архитектуру Channels

ASGI

Вы, наверное, знакомы с WSGI — стандартом, связывающим веб-сервер с Django-приложением. Он отлично подходит для синхронных задач, но при создании чатов или уведомлений в реальном времени начинает показывать свои ограничения.

ASGI — это как WSGI, но с поддержкой асинхронности. Он позволяет Django обрабатывать несколько запросов одновременно, не блокируя основной поток. Используя asyncio, await и async def, можно легко писать масштабируемый код, который легко справляется с реальным временем.

Consumers

В обычном Django представления отвечают за обработку HTTP-запросов и формирование ответов. Но с веб-сокетами и другими асинхронными протоколами возникает вопрос: «А как теперь?» Здесь на помощь приходят Consumers. Если провести аналогию, то Consumer — это представление для асинхронных соединений.

Consumers — это классы, обрабатывающие события жизненного цикла соединения: подключение, получение сообщений и отключение.

Типы Consumers:

  • WebSocketConsumer: Для стандартных веб-сокетных соединений.

  • AsyncWebsocketConsumer: Асинхронная версия с использованием async/await.

  • JsonWebsocketConsumer: Упрощает работу с JSON‑данными.

  • Custom Consumers: Создаёте свои собственные Consumers под специфические задачи.

Основные методы Consumers:

  • connect(): Вызывается при установке соединения. Здесь можно аутентифицировать пользователя.

  • receive(): Обрабатывает входящие сообщения от клиента.

  • disconnect(): Вызывается при разрыве соединения. Время попрощаться и очистить ресурсы.

Поначалу, конечно, непривычно работать с асинхронным кодом, но это того стоит.

Channel Layers

Теперь поговорим о Channel Layers — "нервной системе" самого приложения. Они позволяют различным частям приложения общаться друг с другом, независимо от серверов или процессов.

Channel Layer — это абстракция для передачи сообщений между Consumers. Состоит из двух основных компонентов:

  • Каналы: Уникальные адреса для отправки сообщений. Каждый Consumer имеет свой канал.

  • Группы: Коллекции каналов под общим именем. Позволяют отправлять сообщения сразу нескольким Consumers.

Бэкенды для Channel Layers:

  • In-Memory: Подходит для разработки и тестирования, но не для продакшена.

  • Redis: Наиболее популярный и высокопроизводительный вариант. Быстро, надёжно и масштабируемо.

  • RabbitMQ: Более сложный в настройке, но предоставляет дополнительные возможности и повышенную надёжность.

Для большинства проектов Redis будет идеальным выбором.

Как всё это взаимодействует

Представьте следующий сценарий:

  1. Клиент открывает веб-сокетное соединение с вашим приложением.

  2. ASGI‑сервер принимает соединение и передаёт его вашему Django‑приложению через интерфейс ASGI.

  3. Ваш Consumer получает событие connect, аутентифицирует пользователя и устанавливает соединение.

  4. После подключения Consumer добавляет свой канал в одну или несколько групп через Channel Layer.

  5. Клиент отправляет сообщение, которое обрабатывается методом receive и отправляется в группу.

  6. Channel Layer распространяет сообщение всем Consumers в группе.

  7. Каждый Consumer отправляет сообщение своему клиенту, и все пользователи видят обновления в реальном времени.

Создаём приложение чата с веб-сокетами

Создадим простой чат, чтобы продемонстрировать возможности Channels.

python manage.py startapp chat

Добавляем его в INSTALLED_APPS:

# settings.py

INSTALLED_APPS = [
    # ...
    'chat',
    # ...
]

Создаём файл routing.py в приложении chat:

# chat/routing.py

from django.urls import re_path
from . import consumers

websocket_urlpatterns = [
    re_path(r'ws/chat/(?P<room_name>\w+)/$', consumers.ChatConsumer.as_asgi()),
]

Пишем consumer:

# chat/consumers.py

from channels.generic.websocket import AsyncWebsocketConsumer
import json

class ChatConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        self.room_name = self.scope['url_route']['kwargs']['room_name']
        self.room_group_name = f'chat_{self.room_name}'

        # Присоединяемся к группе
        await self.channel_layer.group_add(
            self.room_group_name,
            self.channel_name
        )

        await self.accept()

    async def disconnect(self, close_code):
        # Покидаем группу
        await self.channel_layer.group_discard(
            self.room_group_name,
            self.channel_name
        )

    # Получаем сообщение от WebSocket
    async def receive(self, text_data):
        text_data_json = json.loads(text_data)
        message = text_data_json['message']

        # Отправляем сообщение в группу
        await self.channel_layer.group_send(
            self.room_group_name,
            {
                'type': 'chat_message',
                'message': message
            }
        )

    # Получаем сообщение от группы
    async def chat_message(self, event):
        message = event['message']

        # Отправляем сообщение обратно клиенту
        await self.send(text_data=json.dumps({
            'message': message
        }))

Код может показаться длинным, но на самом деле всё довольно просто. Главное — понять, как работают группы и сообщения.

Создаём routing.py в корневой директории проекта:

# myproject/routing.py

from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
from django.core.asgi import get_asgi_application
import chat.routing

application = ProtocolTypeRouter({
    "http": get_asgi_application(),
    "websocket": AuthMiddlewareStack(
        URLRouter(
            chat.routing.websocket_urlpatterns
        )
    ),
})

Создадим шаблоны и представления:

Представления:

# chat/views.py

from django.shortcuts import render

def index(request):
    return render(request, 'chat/index.html')

def room(request, room_name):
    return render(request, 'chat/room.html', {
        'room_name': room_name
    })

Шаблон chat/index.html:

<!DOCTYPE html>
<html>
<head>
    <title>Чат</title>
</head>
<body>
    <h1>Добро пожаловать в чат!</h1>
    <p>Введите имя комнаты и присоединяйтесь:</p>
    <form method="get" action="{% url 'room' room_name=room_name %}">
        <input placeholder="Название комнаты" name="room_name" type="text" required>
        <button type="submit">Войти</button>
    </form>
</body>
</html>

Шаблон chat/room.html:

<!-- chat/templates/chat/room.html -->

<!DOCTYPE html>
<html>
<head>
    <title>Комната {{ room_name }}</title>
</head>
<body>
    <h2>Комната: {{ room_name }}</h2>
    <div id="chat-log"></div>
    <input placeholder="Введите сообщение..." id="chat-message-input" type="text" size="100">
    <button id="chat-message-submit">Отправить</button>

    <script>
        const roomName = "{{ room_name }}";
        const chatSocket = new WebSocket(
            'ws://' + window.location.host +
            '/ws/chat/' + roomName + '/'
        );

        chatSocket.onmessage = function(e) {
            const data = JSON.parse(e.data);
            const message = data['message'];
            document.querySelector('#chat-log').innerHTML += (message + '<br>'); // знаете, почему именно <br>?
        };

        chatSocket.onclose = function(e) {
            console.error('Chat socket closed unexpectedly');
        };

        document.querySelector('#chat-message-submit').onclick = function(e) {
            const messageInputDom = document.querySelector('#chat-message-input');
            const message = messageInputDom.value;
            chatSocket.send(JSON.stringify({
                'message': message
            }));
            messageInputDom.value = '';
        };
    </script>
</body>
</html>

Для простоты в шаблонах использован минимальный HTML и JavaScript.

Настройка URL-адресов:

# myproject/urls.py

from django.urls import path
from chat import views

urlpatterns = [
    path('', views.index, name='index'),
    path('chat/<str:room_name>/', views.room, name='room'),
]

Для взаимодействия между различными Consumers настроим каналный слой с Redis.

Установка Redis и зависимостей:

pip install channels_redis

Настройка settings.py:

# settings.py

CHANNEL_LAYERS = {
    'default': {
        'BACKEND': 'channels_redis.core.RedisChannelLayer',
        'CONFIG': {
            'hosts': [os.environ.get('REDIS_URL', ('127.0.0.1', 6379))],
        },
    },
}

Запуск Redis:

Для Ubuntu:

sudo apt-get install redis-server
sudo service redis-server start

Про Consumers

Типы Consumers

  • Synchronous Consumers: Наследуются от channels.generic.websocket.WebsocketConsumer. Используют синхронный код.

  • Asynchronous Consumers: Наследуются от channels.generic.websocket.AsyncWebsocketConsumer. Используют async/await.

Пример синхронного Consumer:

# chat/consumers.py

from channels.generic.websocket import AsyncWebsocketConsumer, WebsocketConsumer
import json

class ChatConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        self.room_name = self.scope['url_route']['kwargs']['room_name']
        self.room_group_name = f'chat_{self.room_name}'

        # Присоединяемся к группе
        await self.channel_layer.group_add(
            self.room_group_name,
            self.channel_name
        )

        await self.accept()

    async def disconnect(self, close_code):
        # Покидаем группу
        await self.channel_layer.group_discard(
            self.room_group_name,
            self.channel_name
        )

    # Получаем сообщение от WebSocket
    async def receive(self, text_data):
        text_data_json = json.loads(text_data)
        message = text_data_json['message']

        # Отправляем сообщение в группу
        await self.channel_layer.group_send(
            self.room_group_name,
            {
                'type': 'chat_message',
                'message': message
            }
        )

    # Получаем сообщение от группы
    async def chat_message(self, event):
        message = event['message']

        # Отправляем сообщение обратно клиенту
        await self.send(text_data=json.dumps({
            'message': message
        }))

# Пример синхронного Consumer
class SyncChatConsumer(WebsocketConsumer):
    def connect(self):
        self.accept()
        self.send(text_data=json.dumps({
            'message': 'Привет от синхронного Consumer!'
        }))

    def receive(self, text_data):
        pass

    def disconnect(self, close_code):
        pass

Когда использовать синхронные Consumers?

Если код полностью синхронный и не использует асинхронные операции, можно использовать синхронные Consumers. Однако рекомендуется отдавать предпочтение асинхронным Consumers для лучшей производительности и масштабируемости.

Аутентификация и доступ

AuthMiddlewareStack позволяет получать доступ к пользователю через self.scope.

Пример доступа к пользователю:

class ChatConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        user = self.scope["user"]
        if user.is_authenticated:
            await self.accept()
        else:
            await self.close()

Можно проверять права пользователя и предоставлять или ограничивать доступ к определённым комнатам или действиям:

if not user.has_perm('chat.view_room'):
    await self.close()

Middleware в Channels

Channels поддерживает middleware для ASGI-приложений. Можно создавать свои собственные middleware для обработки входящих соединений.

Пример создания middleware:

class CustomAuthMiddleware:
    def __init__(self, inner):
        self.inner = inner

    async def __call__(self, scope, receive, send):
        # Здесь можно изменить scope или выполнить другие действия
        return await self.inner(scope, receive, send)

Подключение middleware:

from channels.routing import ProtocolTypeRouter, URLRouter
from channels.middleware import BaseMiddleware
import chat.routing

application = ProtocolTypeRouter({
    "websocket": CustomAuthMiddleware(
        URLRouter(chat.routing.websocket_urlpatterns)
    ),
})

Развертывание Channels-приложения

Для запуска Channels-приложения рекомендуется использовать Daphne — ASGI-сервер.

Установка Daphne:

pip install daphne

Запуск приложения:

daphne -b 0.0.0.0 -p 8000 myproject.asgi:application

Daphne отлично подходит для небольших проектов, но для продакшена лучше использовать комбинацию с Gunicorn.

Комбинирование Gunicorn с Uvicorn Worker:

pip install uvicorn gunicorn

Запуск Gunicorn:

gunicorn myproject.asgi:application -k uvicorn.workers.UvicornWorker

Интеграция с существующими Django-приложениями

Channels прекрасно сочетается с существующими Django-приложениями. Можно потихоньку добавлять асинхронные возможности, не переписывая весь код.

Допустим, есть модель Order, и хочется уведомлять пользователей в реальном времени о статусе заказа.

Модель и сигнал:

# orders/models.py

from django.db.models.signals import post_save
from django.dispatch import receiver
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer
from .models import Order

@receiver(post_save, sender=Order)
def order_status_changed(sender, instance, **kwargs):
    channel_layer = get_channel_layer()
    async_to_sync(channel_layer.group_send)(
        f"user_{instance.user.id}",
        {
            'type': 'order_status',
            'status': instance.status,
            'order_id': instance.id,
        }
    )

Consumer для получения уведомлений:

# notifications/consumers.py

from channels.generic.websocket import AsyncWebsocketConsumer
import json

class NotificationConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        self.user = self.scope['user']
        if self.user.is_authenticated:
            self.group_name = f"user_{self.user.id}"
            await self.channel_layer.group_add(
                self.group_name,
                self.channel_name
            )
            await self.accept()
        else:
            await self.close()

    async def disconnect(self, close_code):
        await self.channel_layer.group_discard(
            self.group_name,
            self.channel_name
        )

    async def order_status(self, event):
        await self.send(text_data=json.dumps({
            'order_id': event['order_id'],
            'status': event['status'],
        }))

Работа с сессиями

В Channels вы также можете получать доступ к сессиям пользователя.

Подключение SessionMiddlewareStack:

from channels.sessions import SessionMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
import chat.routing

application = ProtocolTypeRouter({
    "websocket": SessionMiddlewareStack(
        AuthMiddlewareStack(
            URLRouter(chat.routing.websocket_urlpatterns)
        )
    ),
})

Доступ к сессии:

class ChatConsumer(AsyncWebsocketConsumer):
    async def connect(self):
        session_key = self.scope['session'].session_key
        # Используйте сессию по своему усмотрению

Часто сесси юзают для для хранения временных данных или состояния пользователя.

Производительность

Channels поддерживает запуск нескольких воркеров для обработки нагрузки:

daphne myproject.asgi:application &
python manage.py runworker &
python manage.py runworker &

Помимо этого при развертывании в облаке можно настроить автоскейлинг воркеров в зависимости от нагрузки.

Также юзайте кэширование для хранения часто используемых данных и уменьшения нагрузки на базу данных. Тот же Redis отлично подходит для кэширования.


Немного советов

Держите Consumers простыми: Разделяйте логику на отдельные функции или классы для лучшей читаемости и поддержки.

Для обработки большой нагрузки используйте очереди сообщений, например RabbitMQ, если Redis не справляется.

Всегда проверяйте входные данные и права доступа пользователей.

Ознакомьтесь с официальной документацией Channels для получения более подробной информации.

В заключение напомню про открытый урок «Patroni и его применение с Postgres» — на нем можно получить практические навыки мониторинга и управления высокодоступными кластерами PostgreSQL с помощью Patroni. Урок пройдет 24 октября. Если интересно, записывайтесь по ссылке.

Комментарии (0)