Большинство разработчиков рано или поздно сталкивается с необходимостью реализовать очереди выполнения, сложных вычислительных процессов. Изобилие готовых решений, позволяет выбрать именно то, что вам нужно в вашем текущем задании. Моя задача была достаточно простой и не требовала сложного алгоритма регулирования процесса: один GPU, выполняет какую либо ресурсоёмкую функцию последовательно (в этой статье примером будет Kandinsky 2.1), без обработки ошибок с повторным запуском процесса, функция запускаются пользователем в django. Python разработчика, с такой постановкой вопроса, интернет в первую очередь приводит к Celery. Кратко ознакомившись с документацией, подсознательно закрываешь задачу, все готово, за 15 минут подключу. Но в процессе реализации столкнулся с проблемой, существенно влияющей на скорость выполнения процесса. Каждый раз при выполнении функции в Celery, процесс загружает веса модели, в случае Kandinsky 2.1, веса имеют большой объем. Подобная проблема обсуждалась в stackoverflow. Попытки обойти эту преграду в «сельдерей», приводили к новым ошибкам. Один из вариантов решения ведет к этой статье Scaling AllenNLP/PyTorch in Production, вместо Celery автор предлагает использовать ZeroMQ, с этой реализацией буду работать для решения задачи. В статье вычисления выполняются в CPU, вообщем рекомендую к прочтению. ZeroMQ - будет протоколом обмена между ресурсоёмкой функцией и django, для запуска фонового процесса в очереди буду использовать Django Channels, рабочие и фоновые задачи.

Примерный алгоритм:

  1. Пользователь, из web интерфейса отправляет запрос в Django.

  2. Из основного канала Django, переслать полученный запрос пользователя в «другой» фиксированный канал Django.

  3. Полученный запрос из основного канала отправить по TCP ZeroMQ к ресурсоёмкой функции.

  4. После выполнения функции, вернуть ответ по TCP ZeroMQ в «другой» канал Django.

  5. Вернуть ответ пользователю.

Клиентская часть javascript очень простая, не нуждается в дополнительных комментариях. В моем примере, взаимодействие клиент - сервер происходит с помощью WebSocket, благодаря гибкости Django Channels, вы не ограничены в протоколе обмена данными для запуска фоновых задач.

var ws_wall = new WebSocket("ws://"+ IP_ADDR +":"+PORT+"/");

ws_wall.onmessage = function(event) {
  // что то делаем с полученным ответом
}

// отправить данные в Django
function send_wall() {
    var body = document.getElementById('id_body');
    if (body.value == "") {
        return false;
    }
    if (ws_wall.readyState != WebSocket.OPEN) {
        return false;
    }
    var data = JSON.stringify({body: body.value,
                               event: "wallpost"});
    ws_wall.send(data);
}

Серверный кусок кода основного потока Django Channels. Большинство, практикующих разработчиков взаимодействующих с Django, не увидят чего то нового, стандартные фрагменты кода из документации. Подробно рассказывать о структуре orm django модели не имеет смысла, по коду будет понятно, что всё очень просто.

import json
from channels.generic.websocket import AsyncJsonWebsocketConsumer
from myapp.models import User, Post
from asgiref.sync import sync_to_async
from django.utils import dateformat

class WallHandler(AsyncJsonWebsocketConsumer):
    async def connect(self):
        """
        инициализация подключения
        """
        self.room_group_name = "wall"
        self.sender_id = self.scope['user'].id
        self.sender_name = self.scope['user']
        if str(self.scope['user']) != 'AnonymousUser':
            self.path_data = self.scope['user'].path_data
        await self.channel_layer.group_add(
            self.room_group_name,
            self.channel_name
        )
        await self.accept()
        
    async def disconnect(self, close_code):
        """
        обработка отключения
        покинуть группу
        """
        print("error code: ", close_code)
        await self.channel_layer.group_discard(
            self.room_group_name,
            self.channel_name
        )      
        
    async def receive(self, text_data):
        """
        обработка полученных данных от килиента (WebSocket)
        получить событие и отправиь соответствующее событие
        """
        response = json.loads(text_data)
        event = response.get("event", None)
        if self.scope['user'].is_authenticated: 
            if event == "wallpost":
                post = Post()
                post.body = response["body"]
                post.path_data = self.path_data
                post.user_post = self.scope['user']
                post_async = sync_to_async(post.save) # взаимодействие с синхронным Django
                await post_async()

                """
                из основного канала Django, 
                отправить полученный запрос от пользователя 
                в «другой» фиксированный канал Django
                """
                _temp_dict = {}
                _temp_dict["body"] = response["body"]
                _temp_dict["path_data"] = self.path_data
                _temp_dict["post"] = str(post.id)
                _temp_dict["type"] = "triggerWorker"
                _temp_dict["room_group_name"] = self.room_group_name
                await self.channel_layer.send('nnapp', _temp_dict) # синтаксис await не блокирует работу основного потока
                """
                сервер продолжает работу,
                отправить текущие готовые данные клиенту, 
                без окончания выполнения ресурсоёмкой функции
                """
                _data = {"type": "wallpost",
                         "timestamp": dateformat.format(post.date_post, 'U'),
                         "text":response["body"],
                         "user_post": str(self.sender_name),
                         "user_id": str(self.sender_id),
                         "id": str(post.id),
                         "status" : "wallpost"
                        }
                await self.channel_layer.group_send(self.room_group_name, _data)
    async def wallpost(self, res):
        """  
        отправить сообщение клиенту(WebSocket)
        """
        await self.send(text_data=json.dumps(res))                

ВАЖНО!!! Дополнительные настройки в конфигурационном файле asgi.py Django проекта.

"""
ASGI config for app project.

It exposes the ASGI callable as a module-level variable named ``application``.

For more information on this file, see
https://docs.djangoproject.com/en/3.2/howto/deployment/asgi/
"""

import os
import django
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'app.settings')
django.setup()

from django.core.asgi import get_asgi_application
from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter, ChannelNameRouter
import app.routing
from wall import nnapp

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'app.settings')

application = ProtocolTypeRouter({
    "http": get_asgi_application(),
    "websocket": AuthMiddlewareStack(
        URLRouter(
            app.routing.websocket_urlpatterns,
        )
    ),
    # добавил «другой» фиксированный канал Django
    "channel": ChannelNameRouter({
        "nnapp": nnapp.NNHandler.as_asgi(),
    }),
})

Создам «другой» канал Django, для обработки фоновой задачи. В новый файл nnapp.py, копирую клиентскую часть из статьи Scaling AllenNLP/PyTorch in Production и изменяю для своих нужд.

from myapp.models import User, Post
from asgiref.sync import async_to_sync
from channels.consumer import SyncConsumer
from channels.layers import get_channel_layer
import uuid
import os
import json
import zlib
import pickle
import zmq

# глобальные переменные из статьи
work_publisher = None
result_subscriber = None
TOPIC = 'snaptravel'

RECEIVE_PORT = 5555
SEND_PORT = 5556 

# получить доступ к channel layer
channel_layer = get_channel_layer()

# низкоуровневый протокол подразумивает работу с байтами
def compress(obj):
    p = pickle.dumps(obj)
    return zlib.compress(p)

def decompress(pickled):
    p = zlib.decompress(pickled)
    return pickle.loads(p)
    
def start():
    global work_publisher, result_subscriber
    context = zmq.Context()
    work_publisher = context.socket(zmq.PUB)
    work_publisher.connect(f'tcp://127.0.0.1:{SEND_PORT}') 

def _parse_recv_for_json(result, topic=TOPIC):
    compressed_json = result[len(topic) + 1:]
    return decompress(compressed_json)

def send(args, model=None, topic=TOPIC):
    id = str(uuid.uuid4())
    message = {'body': args["title"], 'model': model, 'id': id}
    compressed_message = compress(message)
    work_publisher.send(f'{topic} '.encode('utf8') + compressed_message)
    return id

def get(id, topic=TOPIC):
    context = zmq.Context()
    result_subscriber = context.socket(zmq.SUB)
    result_subscriber.setsockopt(zmq.SUBSCRIBE, topic.encode('utf8'))
    result_subscriber.connect(f'tcp://127.0.0.1:{RECEIVE_PORT}')
    result = _parse_recv_for_json(result_subscriber.recv())
    while result['id'] != id:
        result = _parse_recv_for_json(result_subscriber.recv())
    result_subscriber.close()
    if result.get('error'):
        raise Exception(result['error_msg'])
    return result
  
# эта функция немного отличаеться от оригинала из статьи
def send_and_get(args, model=None):
    id = send(args, model=model)
    res = get(id)
    namefile = f'{id}.jpg'
    res['prediction'][0].save(f'media/data_image/{args["path_data"]}/{namefile}', format="JPEG")
    post = Post.objects.get(id=args["post"]) 
    post.image = namefile
    post.save()
    _data = {"type": "wallpost", "status":"Kandinsky-2.1", "path_data": args["path_data"],
             "data": f'{namefile}', "post":args["post"]}
    # возвращаем результат в основной канал Django Channels
    async_to_sync(channel_layer.group_send)(args["room_group_name"], _data)

# запуск фиксированого канала с клиентом ZeroMQ
class NNHandler(SyncConsumer):
    start()
    def triggerWorker(self, message):
        print ("data for a background task: ", message)
        send_and_get(message, model='Kandinsky-2.1')

Сервер ZeroMQ с ресурсоёмкой функцией, в моём случае Kadinsky 2.1, по большей части полностью совпадает с кодом статьи, с незначительными дополнениями. Для работы с русским языком, использую обученную модель переводчика Helsinki-NLP/opus-mt-ru-en.

import os, time
from types import SimpleNamespace
import zmq
import zlib
import pickle
import torch.multiprocessing as mp
import threading
import cv2
from kandinsky2 import get_kandinsky2
from transformers import AutoModelForSeq2SeqLM, AutoTokenizer

import uuid

QUEUE_SIZE = mp.Value('i', 0)

def compress(obj):
    p = pickle.dumps(obj)
    return zlib.compress(p)

def decompress(pickled):
    p = zlib.decompress(pickled)
    return pickle.loads(p)

TOPIC = 'snaptravel'
prediction_functions = {}

RECEIVE_PORT = 5556
SEND_PORT = 5555

# «модель» генерация картинки
model = get_kandinsky2('cuda', task_type='text2img', model_version='2.1', use_flash_attention=False)

# «модель» переводчик
tokenizer = AutoTokenizer.from_pretrained("Helsinki-NLP/opus-mt-ru-en")
model_translater = AutoModelForSeq2SeqLM.from_pretrained("Helsinki-NLP/opus-mt-ru-en")


def _parse_recv_for_json(result, topic=TOPIC):
    compressed_json = result[len(topic) + 1:]
    return decompress(compressed_json)

def _decrease_queue():
    with QUEUE_SIZE.get_lock():
        QUEUE_SIZE.value -= 1

def _increase_queue():
    with QUEUE_SIZE.get_lock():
        QUEUE_SIZE.value += 1
    
def send_prediction(message, result_publisher, topic=TOPIC):
    _increase_queue()
    model_name = message['model']
    body = message['body']
    id = message['id']
    
    # подготовка входных данных для обученной «модели» переводчик
    tokenized_text = tokenizer([str(body).lower()], return_tensors='pt')

    # перевод
    translation = model_translater.generate(**tokenized_text)
    body = tokenizer.batch_decode(translation, skip_special_tokens=True)[0]
    print(body)

    # генерация изображения
    images = model.generate_text2img(
        str(body).lower(), 
        num_steps=70,
        batch_size=1, 
        guidance_scale=4,
        h=768, w=768,
        sampler='p_sampler', 
        prior_cf_scale=4,
        prior_steps="5"
    )
    result = {"result": images}

    if result.get('result') is None:
        time.sleep(1)
        compressed_message = compress({'error': True, 'error_msg': 'No result was given: ' + str(result), 'id': id})
        result_publisher.send(f'{topic} '.encode('utf8') + compressed_message)
        _decrease_queue()
        return
      
    prediction = result['result']
    compressed_message = compress({'prediction': prediction, 'id': id})
    result_publisher.send(f'{topic} '.encode('utf8') + compressed_message)
    _decrease_queue()
    print ("SERVER", message, f'{topic} '.encode('utf8'))

def queue_size():
    return QUEUE_SIZE.value

def load_models():
    models = SimpleNamespace()
    return models

def start():
    global prediction_functions

    models = load_models()
    prediction_functions = {
    'queue': queue_size
    }

    print(f'Connecting to {RECEIVE_PORT} in server', TOPIC.encode('utf8'))
    context = zmq.Context()
    work_subscriber = context.socket(zmq.SUB)
    work_subscriber.setsockopt(zmq.SUBSCRIBE, TOPIC.encode('utf8'))
    work_subscriber.bind(f'tcp://127.0.0.1:{RECEIVE_PORT}')

    # send work
    print(f'Connecting to {SEND_PORT} in server')
    result_publisher = context.socket(zmq.PUB)
    result_publisher.bind(f'tcp://127.0.0.1:{SEND_PORT}')

    print('Server started')
    while True:
        message = _parse_recv_for_json(work_subscriber.recv())
        threading.Thread(target=send_prediction, args=(message, result_publisher), kwargs={'topic': TOPIC}).start()

if __name__ == '__main__':
  start()

Запускать django channels для фоновых задач, отдельным процессом:
python manage.py runworker nnapp

Эти части кода из работающего проекта моей предыдущей статьи, на столько популярной и комментируемой что даже не нуждается в упоминании. Хороший пример Build a Pytorch Server with celery and RabbitMQ, но плохо масштабируемый и немного сложней в реализации чем обсуждаемый в этой статье. Были еще несколько идей, но решил пойти по пути наименьшего сопротивления... Для работы с кластером gpu, нужен более сложные алгоритм отслеживания загруженности каждого отдельного gpu, поэтому это простой пример на основе чужого труда. Работающий код https://github.com/naturalkind/social-network.

Комментарии (5)


  1. danilovmy
    20.04.2023 05:13
    +3

    части кода из работающего проекта моей предыдущей статьи, на столько популярной и комментируемой что даже не нуждается в упоминании.

    Смотрю в профиле предыдущую статью:
    Социальный эксперимент ChatGPT Комментарии 0
    И тут я даже как то растерялся. Что за статья то?


    А по делу: Зачем тут Django? Я сам за Django топлю и люблю ее, но не увидел причин использовать ее тут. Есть более легковесные async фреймворки которые этот микровервис запросов в кандинский будут выполнять с радостью и легкостью.
    Кстати, могу отметить, что я вырос в этом плане — лет 5 назад я бы тоже сказал берем джангу и вот так делаем. Сейчас я 100 раз подумаю, и, вероятно, возьму более подходящий под задачу фреймворк.


    1. rsashka
      20.04.2023 05:13
      +1

      Сейчас я 100 раз подумаю, и, вероятно, возьму более подходящий под задачу фреймворк.

      Если не секрет, то какой?


      1. rSedoy
        20.04.2023 05:13
        +2

        FastAPI, хотя есть и другие async фреймворки на python, но лучше FastAPI ;) А еще я бы не брал ZeroMQ, скорее бы взял NATS


        1. rsashka
          20.04.2023 05:13

          Спасибо, посмотрю


  1. evilsadko Автор
    20.04.2023 05:13

    полностью с вами согласен, в моем случае легче создать очереди в Django чем переписать всю логику, готового проекта. Не фанат Django, но отрицать его функционал для сложный проектов не могу. Предложенные программные каркасы в комментариях, действительно хороши, но отсутствие многих готовых типичных решений, ускоряющих процесс разработки, негативный фактор при создании собственного проекта в одиночку. Если у вас есть возможность помочь с переходом с django на fastapi, буду рад. Читайте предыдущую статью...