RabbitMQ
RabbitMQ

Данная статья будет полезна тем кто хочет расширить свой учебный (или даже рабочий) ML пет-проект добавлением новой технологии.

Весь код в статье написан для OpenSource проекта по детекции синтетического текста raisontext в рамках курса ML System Design от ODS.

Небольшой комментарий для упрощения дальнейшего чтения: проект raisontext имеет серверную часть на FastAPI. Серверная часть связана с клиентской посредством веб-сокетов.


Содержание

  1. Вступление

  2. Постановка проблемы

  3. RabbitMQ

  4. Практическая часть, сервер

  5. Практическая часть, модель

  6. Общая архитектура

  7. Итоги


Вступление

В жизненном цикле успешного ML проекта наступает момент, когда необходимо масштабироваться. При этом масштабирование, в зависимости от используемых моделей и выделенного железа, может быть разным.

В случае с raisontext, было принято решение масштабироваться в разграничение сущностей модели и серверной части. А также учесть безотказность предикта модели, в случае если к ней одновременно обращается множество запросов.

Постановка проблемы

Предположим, что предикт модели занимает 5 секунд. В случае когда два юзера попробуют обратиться к модели одновременно, произойдет что-то неявное. Может зависнуть как сам сайт, так и упасть модель.

Так как нас не устраивает данное поведение, необходимо придумать некий разграничивающий туннель, который не позволит любому запросу напрямую попадать в модель, а будет вместо этого вставать в очередь.

Именно для такого поведения и существуют брокеры сообщений.

RabbitMQ

На Хабре уже достаточно учебных статей про RabbitMQ (тык и тык), поэтому подробно останавливаться не буду. Скажу только, что в случае с raisontext будут использоваться две очереди:

  • raisontext_model - для отправки запросов к модели

  • raisontext_answer - для получения ответов модели

Практическая часть, сервер

В данном проекте используется облачная версия RabbitMQ: CloudAMQP

В обычной ситуации для корректной работы с RabbitMQ необходимо создать класс Consumer либо Producer в зависимости от того, кем будет являться сущность (модель или сервер).

В нашем случае и сервер и модель оба являются одновременно и Consumer и Producer. Поэтому создадим универсальный PikaClient, который сможет как принимать, так и отправлять сообщения в очереди.

class PikaClient:
    def __init__(self, process_callable, publish_queue_name, consume_queue_name):
        self.publish_queue_name = publish_queue_name
        self.consume_queue_name = consume_queue_name

        self.connection = pika.BlockingConnection(
            pika.URLParameters(os.environ.get("RABBIT_MQ_URL"))
        )
        self.channel = self.connection.channel()
        self.publish_queue = self.channel.queue_declare(queue=publish_queue_name, durable=True)
        self.consume_queue = self.channel.queue_declare(queue=consume_queue_name, durable=True)

        self.publish_callback_queue = self.publish_queue.method.queue
        self.consume_callback_queue = self.consume_queue.method.queue

        self.response = None
        self.process_callable = process_callable

    async def consume(self, loop):
        """Setup message listener with the current running loop"""
        connection = await connect_robust(
            os.environ.get("RABBIT_MQ_URL"),
            loop=loop
        )

        channel = await connection.channel()
        queue = await channel.declare_queue(self.consume_queue_name, durable=True)
        await queue.consume(self.process_incoming_message, no_ack=False)
        return connection

    async def process_incoming_message(self, message):
        """Processing incoming message from RabbitMQ"""
        await message.ack()
        body = message.body
        print('Received message')
        if body:
            await self.process_callable(json.loads(body))

    def send_message(self, message: dict):
        """Method to publish message to RabbitMQ"""
        self.channel.basic_publish(
            exchange='',
            routing_key=self.publish_queue_name,
            properties=pika.BasicProperties(
                reply_to=self.publish_callback_queue,
                correlation_id=str(uuid.uuid4())
            ),
            body=json.dumps(message)
        )

Кратко пробежимся по основным точкам:

  • RABBIT_MQ_URL - переменная среды, в которой находится уникальный AMQP URL, что-то вроде amqps://efiosdwv:***@cow.rmq2.cloudamqp.com/efiosdwv

  • Асинхронный метод consume получает из очереди сообщение и вызывает process_incoming_message. Благодаря асинхронности consume может выполняться у нескольких юзеров одновременно

  • Асинхронный метод process_incoming_message информирует что сообщение было получено и вызывает функцию, отправленную в конструктор класса (то, что мы хотим чтобы было выполнено при получении сообщения)

  • Синхронный метод send_message отправляет сообщение в очередь

Посмотрим теперь как обращаться с этим классом с серверной стороны. Для начала создадим функцию, которая будет выполняться при получении сообщения из очереди.

async def receive_prediction(message: dict):
    text = message['answer']
    id_ = message["id"]

    print(f'Here we got incoming message from {id_}: {text}')
    websocket = id_to_socket_dict.get(id_, None)
    if websocket is not None:
        await manager.send_personal_message(text, websocket)

Так как raisontext работает на веб-сокетах, то в функции получения предсказания от модели, мы проверяем по айдишнику пользователя есть ли такой веб-сокет. И если есть, то отправляем сообщения обратно на клиентскую часть. Тут стоит упомянуть, что веб-сокет пропадает из id_to_socket_dict в случае, если юзер закрыл сайт.

После того как мы создали функцию, которая выполняется при получении ответа модели, можно инициализировать наш PikaClient:

pika_client = PikaClient(
    receive_prediction,
    publish_queue_name="raisontext_model",
    consume_queue_name="raisontext_answer"
)

Последнее что нам осталось - запустить прослушивание очереди при запуске сервера (consume) и вставить send_message в необходимый эндпоинт (produce).

Так как с send_message все довольно тривиально, давайте посмотрим как запускается прослушивание очереди:

@app.on_event("startup")
async def startup():
    loop = asyncio.get_running_loop()
    task = loop.create_task(pika_client.consume(loop))
    await task

Данный код позволяет слушать очередь асинхронно с обращением к эндпоинтам серверной части. А это именно то, чего мы хотели.

Практическая часть, модель

Теперь когда серверная часть настроена на отправку сообщений и слушание очереди ответов, давайте настроим модель таким же образом.

В отличии от сервера модель не может слушать сообщения из очереди асинхронно. А потому здесь будет немного хардкода, без выделения сущности клиента в отдельный класс.

url = os.environ.get("RABBIT_MQ_URL")
params = pika.URLParameters(url)
connection = pika.BlockingConnection(params)
channel_request = connection.channel()  # start a channel
channel_request.queue_declare(queue='raisontext_model', durable=True)

channel_answer = connection.channel()  # start a channel
channel_answer.queue_declare(queue="raisontext_answer", durable=True)

def callback(ch, method, properties, body):
    dict_message = json.loads(body)
    id_ = dict_message['id']
    text = dict_message['text']
    print(f"[x] Received {id_} with {text}")

    answer = evaluator.predict([text])
    answer_dict = {
        "id": id_,
        "answer": str(round(answer[0], 3))
    }

    channel_answer.basic_publish(
        exchange='',
        routing_key='raisontext_answer',
        body=json.dumps(answer_dict)
    )

channel_request.basic_consume('raisontext_model', callback, auto_ack=True)

print(' [*] Waiting for messages. To exit press CTRL+C')
channel_request.start_consuming()
connection.close()

Во многом код говорит сам за себя. Но разберем некоторые ключевые моменты:

  • evaluator - загруженная модель, у которой есть метод предикт (может быть кастомным классом)

  • callback - функция, которая вызывается при поступлении сообщения в очередь raisontext_model. Так как предполагается, что модель работает долго и находится в рамках одного вычислительного ресурса, callback является синхронным и выполняется для каждого юзера по очереди. В конце тела функции полученный предикт кладется в очередь ответов модели raisontext_answer

Общая архитектура

Теперь когда мы построили систему сервер - брокер сообщений - модель, давайте взглянем на общее устройство проекта:

Поведенческая диаграмма raisontext
Поведенческая диаграмма raisontext

Итоги

Отлично! Теперь когда брокер сообщений настроен, можно думать о новых способах дальнейшего масштабирования проекта.

Благодаря использованию облачной версии RabbitMQ, модель и сервер разграничены и могут запускаться на различных удаленных машинах, что упрощает дальнейшее масштабирование системы.

В целом благодаря использованию брокера сообщений, наша система стала более отказоустойчива и надежна.

Полный код и требования к библиотекам можно найти в официальном репозитории проекта raisontext.

Комментарии (4)


  1. atshaman
    09.01.2024 08:27

    Уф. В текущем описании выглядит, пардон, как "типичный ML-проект": "Давайте для решения 1001 раз решенной задачи затащим в проект hypetechnologyX" (инвесторам это нравится)

    Тащить для реализации очереди задач - лишнюю инфраструктурную сложность нууу.... эээ... такое себе. Я понимаю, решать с помощью RMQ задачу роутинга данных по 10 моделям, горизонтального масштабирования FastAPI-бэкенда, добавлять персистирующий уровень для выскодинамических окружений или там еще чего в этом духе - так нет же.

    Не надо так.


    1. Crimsonland
      09.01.2024 08:27

      Если проверенные и устойчивые инструменты используются по назначению, в этом нет ничего плохого.
      В независимости от масштаба.


      1. atshaman
        09.01.2024 08:27

        Если на стоимость владения решением пофиг, эксплуатировать собственное писево не планируется, но есть необходимость фигак-фигак-и-инвестору не включая голову - то пуркуа бы не па?

        В остальных случаях имеет смысл озадачиться вопросами "А адекватен ли выбор инструмента решаемой задаче? А будут ли у пользователя специалисты по администрированию вот этого вот всего? А нужны ли нам в кодовой базе новые инфраструктурные-и-не-только зависимости или может быть полтораста строчек pure-python кода для организации очереди внутри приложения нам хватит?". Тут я не то, что "ответа" на эти вопросы - а и самих вопросов-то не увидел.


  1. Dominux
    09.01.2024 08:27

    Чем-то похоже на недавно опубликованную мною статью. Похоже, тема реально хайповая