Данная статья будет полезна тем кто хочет расширить свой учебный (или даже рабочий) ML пет-проект добавлением новой технологии.
Весь код в статье написан для OpenSource проекта по детекции синтетического текста raisontext в рамках курса ML System Design от ODS.
Небольшой комментарий для упрощения дальнейшего чтения: проект raisontext имеет серверную часть на FastAPI. Серверная часть связана с клиентской посредством веб-сокетов.
Содержание
Вступление
В жизненном цикле успешного ML проекта наступает момент, когда необходимо масштабироваться. При этом масштабирование, в зависимости от используемых моделей и выделенного железа, может быть разным.
В случае с raisontext, было принято решение масштабироваться в разграничение сущностей модели и серверной части. А также учесть безотказность предикта модели, в случае если к ней одновременно обращается множество запросов.
Постановка проблемы
Предположим, что предикт модели занимает 5 секунд. В случае когда два юзера попробуют обратиться к модели одновременно, произойдет что-то неявное. Может зависнуть как сам сайт, так и упасть модель.
Так как нас не устраивает данное поведение, необходимо придумать некий разграничивающий туннель, который не позволит любому запросу напрямую попадать в модель, а будет вместо этого вставать в очередь.
Именно для такого поведения и существуют брокеры сообщений.
RabbitMQ
На Хабре уже достаточно учебных статей про RabbitMQ (тык и тык), поэтому подробно останавливаться не буду. Скажу только, что в случае с raisontext будут использоваться две очереди:
raisontext_model - для отправки запросов к модели
raisontext_answer - для получения ответов модели
Практическая часть, сервер
В данном проекте используется облачная версия RabbitMQ: CloudAMQP
В обычной ситуации для корректной работы с RabbitMQ необходимо создать класс Consumer либо Producer в зависимости от того, кем будет являться сущность (модель или сервер).
В нашем случае и сервер и модель оба являются одновременно и Consumer и Producer. Поэтому создадим универсальный PikaClient, который сможет как принимать, так и отправлять сообщения в очереди.
class PikaClient:
def __init__(self, process_callable, publish_queue_name, consume_queue_name):
self.publish_queue_name = publish_queue_name
self.consume_queue_name = consume_queue_name
self.connection = pika.BlockingConnection(
pika.URLParameters(os.environ.get("RABBIT_MQ_URL"))
)
self.channel = self.connection.channel()
self.publish_queue = self.channel.queue_declare(queue=publish_queue_name, durable=True)
self.consume_queue = self.channel.queue_declare(queue=consume_queue_name, durable=True)
self.publish_callback_queue = self.publish_queue.method.queue
self.consume_callback_queue = self.consume_queue.method.queue
self.response = None
self.process_callable = process_callable
async def consume(self, loop):
"""Setup message listener with the current running loop"""
connection = await connect_robust(
os.environ.get("RABBIT_MQ_URL"),
loop=loop
)
channel = await connection.channel()
queue = await channel.declare_queue(self.consume_queue_name, durable=True)
await queue.consume(self.process_incoming_message, no_ack=False)
return connection
async def process_incoming_message(self, message):
"""Processing incoming message from RabbitMQ"""
await message.ack()
body = message.body
print('Received message')
if body:
await self.process_callable(json.loads(body))
def send_message(self, message: dict):
"""Method to publish message to RabbitMQ"""
self.channel.basic_publish(
exchange='',
routing_key=self.publish_queue_name,
properties=pika.BasicProperties(
reply_to=self.publish_callback_queue,
correlation_id=str(uuid.uuid4())
),
body=json.dumps(message)
)
Кратко пробежимся по основным точкам:
RABBIT_MQ_URL - переменная среды, в которой находится уникальный AMQP URL, что-то вроде amqps://efiosdwv:***@cow.rmq2.cloudamqp.com/efiosdwv
Асинхронный метод consume получает из очереди сообщение и вызывает process_incoming_message. Благодаря асинхронности consume может выполняться у нескольких юзеров одновременно
Асинхронный метод process_incoming_message информирует что сообщение было получено и вызывает функцию, отправленную в конструктор класса (то, что мы хотим чтобы было выполнено при получении сообщения)
Синхронный метод send_message отправляет сообщение в очередь
Посмотрим теперь как обращаться с этим классом с серверной стороны. Для начала создадим функцию, которая будет выполняться при получении сообщения из очереди.
async def receive_prediction(message: dict):
text = message['answer']
id_ = message["id"]
print(f'Here we got incoming message from {id_}: {text}')
websocket = id_to_socket_dict.get(id_, None)
if websocket is not None:
await manager.send_personal_message(text, websocket)
Так как raisontext работает на веб-сокетах, то в функции получения предсказания от модели, мы проверяем по айдишнику пользователя есть ли такой веб-сокет. И если есть, то отправляем сообщения обратно на клиентскую часть. Тут стоит упомянуть, что веб-сокет пропадает из id_to_socket_dict в случае, если юзер закрыл сайт.
После того как мы создали функцию, которая выполняется при получении ответа модели, можно инициализировать наш PikaClient:
pika_client = PikaClient(
receive_prediction,
publish_queue_name="raisontext_model",
consume_queue_name="raisontext_answer"
)
Последнее что нам осталось - запустить прослушивание очереди при запуске сервера (consume) и вставить send_message в необходимый эндпоинт (produce).
Так как с send_message все довольно тривиально, давайте посмотрим как запускается прослушивание очереди:
@app.on_event("startup")
async def startup():
loop = asyncio.get_running_loop()
task = loop.create_task(pika_client.consume(loop))
await task
Данный код позволяет слушать очередь асинхронно с обращением к эндпоинтам серверной части. А это именно то, чего мы хотели.
Практическая часть, модель
Теперь когда серверная часть настроена на отправку сообщений и слушание очереди ответов, давайте настроим модель таким же образом.
В отличии от сервера модель не может слушать сообщения из очереди асинхронно. А потому здесь будет немного хардкода, без выделения сущности клиента в отдельный класс.
url = os.environ.get("RABBIT_MQ_URL")
params = pika.URLParameters(url)
connection = pika.BlockingConnection(params)
channel_request = connection.channel() # start a channel
channel_request.queue_declare(queue='raisontext_model', durable=True)
channel_answer = connection.channel() # start a channel
channel_answer.queue_declare(queue="raisontext_answer", durable=True)
def callback(ch, method, properties, body):
dict_message = json.loads(body)
id_ = dict_message['id']
text = dict_message['text']
print(f"[x] Received {id_} with {text}")
answer = evaluator.predict([text])
answer_dict = {
"id": id_,
"answer": str(round(answer[0], 3))
}
channel_answer.basic_publish(
exchange='',
routing_key='raisontext_answer',
body=json.dumps(answer_dict)
)
channel_request.basic_consume('raisontext_model', callback, auto_ack=True)
print(' [*] Waiting for messages. To exit press CTRL+C')
channel_request.start_consuming()
connection.close()
Во многом код говорит сам за себя. Но разберем некоторые ключевые моменты:
evaluator - загруженная модель, у которой есть метод предикт (может быть кастомным классом)
callback - функция, которая вызывается при поступлении сообщения в очередь raisontext_model. Так как предполагается, что модель работает долго и находится в рамках одного вычислительного ресурса, callback является синхронным и выполняется для каждого юзера по очереди. В конце тела функции полученный предикт кладется в очередь ответов модели raisontext_answer
Общая архитектура
Теперь когда мы построили систему сервер - брокер сообщений - модель, давайте взглянем на общее устройство проекта:
Итоги
Отлично! Теперь когда брокер сообщений настроен, можно думать о новых способах дальнейшего масштабирования проекта.
Благодаря использованию облачной версии RabbitMQ, модель и сервер разграничены и могут запускаться на различных удаленных машинах, что упрощает дальнейшее масштабирование системы.
В целом благодаря использованию брокера сообщений, наша система стала более отказоустойчива и надежна.
Полный код и требования к библиотекам можно найти в официальном репозитории проекта raisontext.
Комментарии (4)
Dominux
09.01.2024 08:27Чем-то похоже на недавно опубликованную мною статью. Похоже, тема реально хайповая
atshaman
Уф. В текущем описании выглядит, пардон, как "типичный ML-проект": "Давайте для решения 1001 раз решенной задачи затащим в проект hypetechnologyX" (инвесторам это нравится)
Тащить для реализации очереди задач - лишнюю инфраструктурную сложность нууу.... эээ... такое себе. Я понимаю, решать с помощью RMQ задачу роутинга данных по 10 моделям, горизонтального масштабирования FastAPI-бэкенда, добавлять персистирующий уровень для выскодинамических окружений или там еще чего в этом духе - так нет же.
Не надо так.
Crimsonland
Если проверенные и устойчивые инструменты используются по назначению, в этом нет ничего плохого.
В независимости от масштаба.
atshaman
Если на стоимость владения решением пофиг, эксплуатировать собственное писево не планируется, но есть необходимость фигак-фигак-и-инвестору не включая голову - то пуркуа бы не па?
В остальных случаях имеет смысл озадачиться вопросами "А адекватен ли выбор инструмента решаемой задаче? А будут ли у пользователя специалисты по администрированию вот этого вот всего? А нужны ли нам в кодовой базе новые инфраструктурные-и-не-только зависимости или может быть полтораста строчек pure-python кода для организации очереди внутри приложения нам хватит?". Тут я не то, что "ответа" на эти вопросы - а и самих вопросов-то не увидел.