Всем привет!

Меня зовут Никита, и я курирую разработку нескольких проектов в ДомКлик. Сегодня я хочу продолжить тему «веселых картинок» в мире RabbitMQ. В своей статье Алексей Казаков рассматривал такой мощный инструмент, как отложенные очереди, и разные реализации стратегии Retry. А сегодня мы поговорим, как использовать RabbitMQ для планирования периодических задач.

Зачем нам понадобилось создавать свой велосипед и почему мы отказались от Сelery и других инструментов менеджмента задач? Дело в том, что они не подошли под наши задачи и требования к отказоустойчивости, которые у нас в компании достаточно жесткие.

При переходе на Docker и Kubernetes многие разработчики сталкиваются с проблемами организации периодических задач, кроны запускаются с бубном, да и контроль процесса оставляет желать лучшего. А еще есть проблемы с пиковой нагрузкой в дневное время.

Передо мной стояла задача реализовать в проекте надежную систему обработки периодических задач, при этом легко масштабируемую и отказоустойчивую. Наш проект на Python, поэтому логично было посмотреть, насколько нам подходит Celery. Это хороший инструмент, но с ним мы часто сталкивались с проблемами надежности, масштабирования и организации бесшовных релизов. Одна пода — одна группа процессов. При масштабировании Celery приходится увеличивать ресурсы одной поды, потому что нет синхронизации между подами, а это означает остановку обработки задач, хоть и временную. А если задачи еще и длительного выполнения, то вы уже догадались, насколько сложно этим управлять. Второй очевидный недостаток: из коробки нет поддержки асинхронности, а для нас это важно, потому что задачи в основном содержат операции ввода-вывода, а Celery работает на тредах.

На тот момент (2018 год) мы не нашли подходящего готового инструмента, и начали разрабатывать свой. Взяв за основу функциональность отложенного выполнения задач и Dead Letter Exchange, мы решили создать систему обработки периодических задач. Концепт выглядел как-то так:

image

Попробую объяснить, что к чему.

  1. Задачи в виде сообщения попадают в коммутатор планировщика (Scheduler exchange).
  2. По routing_key попадают в нужную очередь «вызревания» (Hatchery queue), которая имеет параметр message_ttl, а также связь с коммутатором исполнения (Processor exchange) в качестве deal letter exchange. Очередь «вызревания» не связана с типом задач, она только играет роль «таймера», то есть вы можете создать столько очередей, сколько периодов вам требуется, и управлять через routing_key.
  3. Так как у очереди нет слушателей, сообщения после «вызревания» в очереди попадают в коммутатор исполнения (Processor exchange).
  4. Дальше свободный потребитель (Processing consumer) забирает сообщение и исполняет. После исполнения цикл при необходимости повторяется.

В чем же преимущество такой схемы?

  1. Поэтапность исполнения, то есть новая задача не поступит в обработку, если предыдущая не завершилась.
  2. Единый слушатель (consumer), то есть можно делать как универсальные воркеры, так специализированные. Масштабируется простым повышением количества нужных под.
  3. Деплой новых задач без нарушения работы текущих. Достаточно мягко обновить поды-слушатели и отправить в очередь соответствующее сообщение. То есть можно поднимать поды с новым кодом, которые займутся новыми сообщениями, а текущие процессы будут доживать в старых подах. Так мы получаем бесшовное обновление.
  4. Можно использовать асинхронный код и любую инфраструктуру, при этом независимы от стека.
  5. Можно контролировать исполнение задач на уровне нативного ack/reject, а также получаем дополнительную опциональную очередь (control queue), которая может отслеживать жизненный цикл задач.

Схема получилась на деле достаточно простой, мы быстро создали рабочий прототип. И код получился красивым. Достаточно callback-функцию пометить простым декоратором, контролирующим жизненный цикл сообщения.

def rmq_scheduler(routing_key_for_delay_queue, routing_key_for_processing_queue):
    def decorator(func):
        @wraps(func)
        async def wrapper(channel, body, envelope, properties):
            try:
                res = await func(channel, body, envelope, properties)
                await channel.publish(
                    payload=body,
                    exchange_name='',
                    routing_key=routing_key_for_delay_queue,
                )
                await channel.basic_client_ack(envelope.delivery_tag)
                return res
            except Exception as e:
                log_error(e)
                redelivered_count = get_count_of_redelivery_attempts(properties)
                if redelivered_count <= 3:
                    await resend_msg(
                        channel=channel,
                        body=body,
                        properties=properties,
                        routing_key=routing_key_for_processing_queue)
                else:
                    async with app.natalya_db_engine.acquire() as conn:
                        async with conn.begin():
                            await channel.publish(
                                payload=body,
                                exchange_name='',
                                routing_key=routing_key_for_delay_queue,
                            )
                await channel.basic_client_ack(envelope.delivery_tag)

        return wrapper

    return decorator

Сейчас мы используем эту схему для выполнения только периодических последовательных задач, но ее можно использовать и тогда, когда важно начать выполнять задачу в определенной время, без смещения времени на само исполнение. Для этого достаточно повторно запланировать задачу после попадания сообщения в supervisor.

Правда, у этого подхода есть дополнительные накладные расходы. Нужно понимать, что в случае ошибки сообщение вернется в очередь, его подхватит другой воркер и сразу начнет выполнять. Потому нужно разделять обработку ошибок по степени критичности и заранее продумывать, что делать с сообщением при той или иной ошибке.

Возможные варианты:

  1. Ошибка исправится сама (например, это системная ошибка): отправляем noack и повторяем обработку ошибок.
  2. Ошибка бизнес логики: нужно прервать цикл — отправляем ack.
  3. Ошибка из пункта 1 повторяется слишком часто: отравляем reject и сигнализируем разработчикам. Тут возможны варианты. Можно создать deal letter-очередь для отстоя сообщений, чтобы после разбора вернуть сообщение, или же применить технику ретраев (указать message_ttl).

Пример декоратора:

def auto_ack_or_nack(log_message):
   def decorator(func):
       @wraps(func)
       async def wrapper(channel, body, envelope, properties):
           try:
               res = await func(channel, body, envelope, properties)
               await channel.basic_client_ack(envelope.delivery_tag)
               return res
           except Exception as e:
               await channel.basic_client_nack(envelope.delivery_tag, requeue=False)
               log_error(log_message, exception=e)
 
       return wrapper
 
   return decorator

Эта схема у нас работает уже полгода, она достаточно надёжна и практически не требует внимания. Падение приложения не нарушает работу планировщика и всего лишь немного задерживает выполнение задач.

Плюсов не бывает без минусов. У этой схемы есть и критическая уязвимость. Если с RabbitMQ что-то случилось и сообщения пропали, тогда нужно вручную смотреть, что потерялось, и запускать цикл снова. Но это исключительно маловероятная ситуация, при которой думать об этом сервисе придется в последнюю очередь :)

P.S. Если тема планирования периодических задач вам покажется интересной, то в следующей статье, я подробнее расскажу, как у нас устроена автоматизация создания очередей, а также про Supervisor.

Ссылки: