Содержание статьи:

  1. Обработчики и Брокеры (Workers & Brokers)

  2. Очереди (Queues)

  3. Задачи (Tasks)

  4. Подтверждение задач (Task Acknowledgment)

  5. ETA

  6. Повторное исполнение задач (Retry Tasks)

  7. CPU, I/O ограничения и Процессы vs Потоки

  8. Заключение

  9. Заключение от переводчика


Начнем с небольшой философии на тему разработки. Вопрос простой: должны ли мы сначала до последней страницы изучить документацию, а затем, когда мы, предположительно, полностью поймем библиотеку изнутри, начать использовать ее в своем коде? Или мы должны сначала использовать ее, поиграться с ней, прежде чем возвращаться и читать документацию.

Celery на самом деле полна подводных камней. Отчасти потому, что внутри происходит работа с параллельными процессами, потоками ... и большую часть времени такие детали скрываются. Зачастую разработчику не нужно думать о них, и, следовательно, контакта со всем этим почти нет. И отчасти поэтому, для разработчика Celery иногда ведет себя самым неожиданным образом. Поэтому в нашем случае чтение документации все-таки необходимо.

Обработчики и Брокеры (Workers & Brokers)

Для начала, нужно объяснить некоторые основные понятия, которые используются в Celery.

Celery — это "Очередь задач" (Task queue). Да, для меня тоже было неизвестно, что это реальный термин. Мне казалось, что это описание того, что это такое в принципе — очередь задач, которые в конечном итоге будут выполнены. Итак, Celery это программа, которая отслеживает задачи (tasks), которые необходимо выполнить, и в которой есть набор обработчиков (workers), которые будут выполнять эти задачи. Основной смысл в том, что она (программа) может выполнять несколько задач параллельно и что она не блокирует поставщиков (producers) этих самых задач.

Celery на самом деле не хранит все эти задачи в памяти. Для хранения задач есть отдельный сервис, называемый брокером сообщений (message broker), который по сути своей является очередью. Обычно это либо Redis, либо RabbitMQ. Т.е. Celery следит за тем, что происходит в очереди, но хранится она внутри Redis/RabbitMQ.

При запуске Celery создается 1 обработчик.

celery -A tasks worker

Этот обработчик является главным процессом (supervisor process), который будет порождать дочерние процессы или потоки, которые в свою очередь будут выполнять задачи. По умолчанию главный обработчик будет создавать дочерние процессы, а не потоки, и он создаст столько одновременных дочерних процессов, сколько ядер у процессора. Главный процесс будет следить за тем, что происходит с задачами и процессами/потоками, но он не будет запускать сами задачи. Эта группа дочерних процессов или потоков, которая ожидает выполнения задач, называется пулом выполнения (execution pool) или пулом потоков (thread pool).

Очереди (Queues)

Да, тут намеренно используется множественное число для очередей, потому что существует несколько видов очередей ????????‍⚗️.

Прежде всего, существует главная очередь (main queue), которая принимает задачи от поставщиков (producers) по мере их поступления и передает обработчикам по мере их запроса. По умолчанию есть только одна такая очередь. Все обработчики принимают задачи из одной очереди. Но вы также можете указать несколько таких очередей и назначить конкретные обработчики на определенные очереди. Очередь по умолчанию называется celery.

Чтобы просмотреть первые 100 задач в очереди в Redis, выполните:

redis-cli lrange celery 0 100

Эти очереди сильно напоминают FIFO (First In First Out), но это не совсем так. Задачи, которые сначала помещаются в очередь, первыми удаляются из очереди, НО они не обязательно выполняются первыми.

Когда обработчики извлекают новые задачи из очереди, они обычно (по умолчанию) берут не столько задач, сколько у них есть процессов, они берут больше. По умолчанию формула для количества взятых задач такая:

# n_processes — количествово потоков/процессов
n_tasks = 4 * n_processes

Делается это для экономии времени. Взаимодействие с брокером занимает какое-то время, и если задачи, которые необходимо выполнить, выполняются быстро, то обработчики будут запрашивать дополнительные задачи снова, и снова, и снова. Чтобы избежать этого, они запрашивают в X раз больше задач, чем у них есть процессов, за это отвечает параметр worker_prefetch_multiplier (прим: этот момент хорошо описан в документации).

Но ведь есть задачи, которые никогда не попадают в главную очередь и все равно выполняются. Как это возможно, спросите вы меня? Задавая себе и Google один и тот же вопрос, хочу сообщить вам, что Google очень мало что смог сказать по этому поводу. Нашлись только обрывки информации. Но, посидев с Celery и Redis несколько часов (или дней??), обнаружилось следующее.

Задачи с ETA никогда не помещаются в главную очередь. Они помещаются во что-то среднее между очередью и списком “неподтвержденных задач”, и называется эта очередь unacked (прим: сокращение от слова "unacknowledged"). Согласитесь, что очень легко пропустить что-то с названием unacked, когда вы пытаетесь понять, как и куда некоторые задачи только что исчезли. Итак, примечание для следующего раза, когда мне или вам нужно будет что-то назвать: все названия, с которыми встречается пользователь, должны быть прописаны полностью.

Итак, что представляют собой ETA задачи? Это запланированные задачи. ETA расшифровывается как "estimated time of arrival" (“предполагаемое время прибытия”). Все задачи, для которых указано ETA или обратный отсчет (countdown), например:

my_task.apply_async((1, 2), countdown=3)
my_task.apply_async((1, 2), eta=tomorrow_datetime)

хранятся в unacked очереди. Сюда же попадают и задачи с ретраями, потому что при повторном выполнении задачи она повторяется через определенное количество секунд, а это означает, что у нее есть ETA.

Посмотреть, какие задачи находятся в очереди ETA в Redis, можно вот так:

redis-cli HGETAL unacked

В ответ получите список из чередующихся ключей и значений:

1) "46165d9f-cf45-4a75-ace1-44443337e000"
2) "[{\"body\": \"W1swXSwge30sIHsiY2FsbGJhY2tzIjogbnVsbCwgImVycmJhY2tzIj\", \"content-encoding\": \"utf-8\", \"content-type\": \"application/json\", \"priority\": 0, \"body_encoding\": ...
3) "d91e8c77-25c0-497f-9969-0ccce000c6667"
4) "[{\"body\": \"W1s0XSwge30sIHsiY2FsbGJhY2tzIjogbnVsbCwgI\", \"content-encoding\": \"utf-8\", ...
...

Задачи (Tasks)

Задачи иногда также называют сообщениями. По сути брокер сообщений - это нечто, что передает сообщения из одной системы в другую. В нашем случае сообщение представляет собой описание задачи: название (уникальный идентификатор), входные параметры, время ожидания, количество повторных попыток и тд.

В celery задача является классом. Таким образом, каждый раз, когда вы используете декоратор для функции (например, @shared_task), чтобы сделать ее celery задачей, под капотом создается класс. Это означает, что у каждой задачи есть self, к которому добавляется множество атрибутов, например: name, request, status, priority, retries и многое другое. Если мы хотим получить доступ к этим атрибутам, то нужно указать параметр bind=True.

@shared_task(bind=True,...)
def _send_one_email(self, email_type, user_id):
    ...
    num_of_retries = self.request.retries
    ...

Подтверждение задач (Task Acknowledgment)

Ранее мы говорили, что, когда у обработчиков нет задач, они идут и получают еще несколько задач от брокера. Но не все так просто. Когда обработчик “берет” задачу, задача перемещается из главной очереди в unacked очередь. Задача полностью удаляется из брокера только после того, как обработчик подтвердит это. Это означает, что когда обработчик забирает себе очередную пачку задач, на самом деле в этот момент задачи только резервируются. Они помещаются в unacked очередь и другие обработчики их не возьмут. Если процесс обработчика умирает, то эти задачи становятся доступными для других обработчиков.

Итак, когда обработчик всё же подтверждает выполнение задачи? По умолчанию Celery предполагает, что опасно запускать задачу более одного раза, следовательно, подтверждение задачи происходит непосредственно перед ее выполнением. Вы можете изменить это, установив значение acks_late=True. В этом случае задача имеет небольшую вероятность быть запущенной более одного раза, если обработчик, выполняющий ее, умирает в середине выполнения. И под “умирает” буквально подразумевается умереть. Python Exception в коде задачи не приведет к смерти обработчика. Такая задача по-прежнему будет подтверждена, но ее состояние будет FAILURE. Должно произойти что-то из ряда вон выходящее, чтобы обработчик никогда не достиг момента self.confirmate(). И на самом деле это редкость. По этой причине, можно сказать, что значение параметра acks_late имеет мало значения.

ETA

Как уже упоминалось, ETA задачи ... тяжело отыскать. Они никогда не попадают в главную очередь. Они сразу назначаются обработчику и помещаются в unacked очередь. Я подозреваю, что это не было сделано намеренно, чтобы задачи ETA немедленно назначались конкретному обработчику. Скорее это было просто следствием существующего кода. Задача ETA не может попасть в общую очередь, которая работает почти как FIFO. Единственное другое место для нее находится среди неподтвержденных задач, и в этом случае она должно быть зарезервирована одним обработчиком.

Интересно, что время ETA не является точным временем выполнения этой задачи. Вместо этого это самый ранний момент выполнения этой задачи. Как только наступит время ETA, задача должна дождаться освобождения обработчика.

Повторное исполнение задач (Retry Tasks)

Celery по умолчанию сама не делает ретраи для задач. Главным образом потому, что предполагается, что задачи не являются идемпотентными (прим: одинаковыми независимо от момента выполнения), и поэтому небезопасно запускать их более одного раза. Однако, в Celery есть функционал для ретраев задач, но он должен быть явно и отдельно настроен для каждой задачи.

Одним из способов запуска ретрая является вызов self.retry() в задаче. Что происходит после того, когда вы вызываете эту команду? Вычисляется время ETA, собираются некоторые новые метаданные, а затем задача отправляется брокеру, где она попадает в unacked очередь и назначается тому же обработчику, который уже выполнял эту задачу. Именно так ретрай-задачи становятся задачами ETA и, следовательно, никогда не отображаются в главной очереди брокера. Это очень изящная, но неожиданная схема. И опять же, Google очень мало что может сказать по этому поводу.

Подробнее о ретраях читайте в гайде по ретраям в Celery.

CPU, I/O ограничения и Процессы vs Потоки

Как мы уже говорили, по умолчанию Celery выполняет задачи в отдельных процессах, а не в потоках. Но вы можете заставить использовать потоки, стартуя обработчики либо с помощью --poll eventlet, либо с помощью --pool gevent. И eventlet, и gevent на самом деле создают гринлеты, а не потоки. Гринлеты (или зеленые потоки) похожи на потоки, но не являются ими, потому что по определению потоки управляются операционной системой. Гринлеты не полагаются на ОС для обеспечения поддержки потоков, вместо этого они эмулируют многопоточность. Они управляются в пространстве программы, а не в пространстве операционной системы. В любой данный момент нет переключения между потоками. Вместо этого гринлеты добровольно или явно передают управление друг другу в определенных точках вашего кода.

Если ваши задачи сильно загружают процессор, если они выполняют много вычислений (CPU-bound), то вам следует продолжать использовать процессы. Если, с другой стороны, ваши задачи в основном выполняют HTTP-запросы (I/O bound), то лучше использовать потоки. Причина этого заключается в том, что, пока ваша задача ожидает, пока HTTP-запрос вернет результат, она ничего не делает, она не использует процессор и, следовательно, не будет "возражать", если другой поток будет использовать его.

В Celery гораздо больше неочевидных моментов

Документация в Celery далека от идеала. Описание многих функций разбито на части и разбросано по всем страницам. Трудно найти детали реализации. Я не знаю, как Celery будет вести себя за пределами сценариев, которые создает разработчик. Конечно, после нескольких лет интенсивной работы можно было бы хорошо понять, как это работает, но Celery живет на периферии моей повседневной жизни. Celery ведет себя совершенно по-разному, когда находится на сервере и когда находится на моем компьютере. Я вижу, какие задачи были выполнены, но я не вижу, насколько хорошо они были выполнены. Это сложно измерить у того, что выполняется параллельно, в потоках и почти независимо от вашей программы. Я не доверяю Celery, я не верю, что правильно понимаю настройки, или, можно сказать, я не верю, что знаю, как их правильно настроить. Celery подобен приведению, которое приходит и уходит, иногда ломается, но в большинстве случаев просто работает. Надеюсь, Celery справится с теми задачами, которые мы ему поручаем, но если это не так, он будет работать так же тихо и мы ничего об этом не узнаем.

Источники:


Конечно, можно сказать, что всё это можно было вычитать из официальной документации. Как бы да, но нет. И это доказывает нам автор статьи Ines Panker, которая провела много часов (дней?) на чтение документации, поиск в гугле, тыканье в python/celery/redis. Скажем же ей спасибо за это. А я надеюсь, что эта статья была полезна для вас и что у вас появилось больше понимания того, что вы используете.

Никита Шамаев

Подписывайтесь на мой телеграм-канал. Там еще больше постов про разработку/технологии/датасеты, и там вы не пропустите анонс следующей статьи ????

Комментарии (10)


  1. baldr
    12.09.2022 17:47
    +2

    Сейчас будет критика. К переводчику замечаний почти нет кроме вопроса - зачем вы выбрали это невнятное бормотание двухлетней давности для перевода?

    Во-первых, все примеры и "объяснения" в этой статье - только для Redis-брокера. Если использовать RabbitMQ - там нет никаких "unacked" очередей и работа сделана через родные AMQP-механизмы. Более того, сама celery ничего не знает про этих брокеров и использует еще несколько библиотек (того же автора) более низкого уровня для транспорта (kombu, amqplib), для работы с процессами (billiard), для сериализации и тп.

    Брокеров тоже может быть довольно много разных и механизм работы с каждым движком тоже будет отличаться. Лучше всего использовать RabbitMQ, но проще всего Redis - поэтому он во всех примерах и идет.

    Сама по себе Celery - тот еще здоровый комбайн. Неочевидных моментов там, действительно, на два вагона наберется. Я с ней работаю уже лет 10 и примерно знаю что делать не надо, однако фич в ней, конечно, дофига.

    Не очень понятно какие "неочевидные моменты" проясняет эта статья. На мой взгляд, только запутывает. Можно было бы рассказать о более насущных вещах: как правильно делать scheduled-таски, как масштабировать и отменять задачи, как запускать в продакшене, как делать мониторинг. И совсем уж актуальное - как организовать выполнение долгих тасков, отправляя их из асинхронного фреймворка типа FastAPI или даже из другого языка.


    1. shamaevnn Автор
      12.09.2022 18:02

      Да, действительно, возможно, это не самая лучшая статья для перевода. Но, честно, я не нашел чего-то более вразумительного. Возможно, плохо искал) Если поделитесь в комментах -- я только за ????

      Большинство статей рассказывают, как просто поднять Celery в Django. Ну максимум еще скажут, что есть какой-то там брокер сообщений и система воркеров с очередями.

      А лично для меня, не имеющего 10 лет опыта, эта статья ответила на некоторые вопросы.


  1. kai3341
    13.09.2022 21:05
    +1

    В Celery всё очень плохо с asyncio

    Почему статья в хабе Django?


    1. baldr
      13.09.2022 21:50

      Ну не так уж и плохо. Если не использовать eventlet/greenlet, то можно взять eventloop и запустить асинхронные таски. Главное - дождаться их завершения. У меня довольно неплохо работает такой механизм для скачивания пачки файлов.

      Вот с чем плохо - так это с потоками, насколько я помню. Не знаю уже как в последних версиях, но в (старой) версии 3 категорически нельзя было запускать потоки внутри таски. Если не починили ничего, то и всякие sync_to_async тоже могут не работать нормально.


      1. kai3341
        13.09.2022 22:34

        eventlet/greenlet

        Предлагаете дублировать код?

        можно взять eventloop и запустить асинхронные таски

        одну. асинхронную. таску. Э -- эффективность


        1. baldr
          13.09.2022 23:40

          Почему одну? Я имею в виду запустить одну celery-таску, в нем взять eventloop и запустить таски (не celery), дождаться завершения через asyncio.gather. Они работают асинхронно. celery-таска запускается все равно в отдельном процессе (prefork), eventloop ни с кем не шарится.


          1. kai3341
            14.09.2022 03:20

            Я имею в виду запустить одну celery-таску, в нем взять eventloop и
            запустить таски (не celery), дождаться завершения через asyncio.gather

            То есть одна таска селери на воркер. Эффективность. Вы перечёркиваете все преимущества asyncio

            Они работают асинхронно. celery-таска запускается все равно в отдельном процессе (prefork), eventloop ни с кем не шарится.

            Именно. Все преимущества asyncio перечёркнуты

            Есть обходной путь -- celery-pool-asyncio. Но это дикий костыль


            1. baldr
              14.09.2022 07:38

              Но.. В celery и так всегда одна таска на воркер! :) В классическом режиме, через prefork-пул, по крайней мере. Я не использую потоки, а eventlet - совершенно непредсказуемая штука и часто ломает остальные библиотеки (как было с openssl).

              Если вы говорите про запуск celery-тасков через asyncio - да, этого нет и неизвестно будет ли вообще. Сначала, года 3 назад, обещали в версии 5, потом обещание переместилось в версию 6.

              Однако, запускать asyncio внутри воркера вполне можно и нужно. У меня довольно крупные таски - например, скачать 50 гигабайтных файлов и объединить в один - прекрасно качаются все параллельно.


              1. kai3341
                14.09.2022 14:58

                Но.. В celery и так всегда одна таска на воркер! :)

                Аргументация в стиле:

                -- Но позвольте, вы творите технический абсурд

                -- Все так делают :)

                У меня довольно крупные таски - например, скачать 50 гигабайтных
                файлов и объединить в один - прекрасно качаются все параллельно.

                Идея разделения на таски в том, что в случае ошибки мы всегда можем восстановить прогресс с минимальными потерями. Ваш подход предполагает написание своего менеджера загрузок


                1. baldr
                  14.09.2022 15:33

                  Вообще говоря мы начали с вашего утверждения о том как все плохо с asyncio и я только уточнил что не все там плохо. Но если вы хотите научить меня как скачивать и склеивать файлы - я не против узнать ваши рекомендации.

                  Celery всегда был синхронным и всегда в воркере выполнялась ровно одна таска (если без eventlet/greenlet). В общем-то, основная задача таких обработчиков и была - взять все CPU-bound таски и выполнять где-то в бэкграунде. Asyncio совсем не поможет если вы запускаете обсчет матрицы или какую-нибудь ML-задачу.