Лучший способ что-то понять - попробовать на практике, а лучшая практика - это реальные примеры. В этой статье мы разберем возможности Celery и применение основных параметров для управления задачами. Для начала пару слов про сам Celery.

Зачем же нам Celery?

Celery - это самый популярный инструмент для асинхронной обработки задач. Он позволяет выполнять задачи в фоновом режиме и гибко настраивать параметры для каждой задачи. Кроме того, Celery предоставляет возможности для планирования, работы с разными очередями и мониторинга выполнения задач.

А теперь рассмотрим шесть базовых сценариев использования.

Сценарий 1: Выполнение задачи в фоне

Начнем с самого простого - обычное выполнение задачи в фоновом режиме. Представим, что нам нужно сгенерировать тяжелый отчет по запросу и синхронное выполнение здесь не подходит (слишком долго). Поэтому мы отправляем запрос на сервер для его генерации, после чего задача встает в очередь. Когда очередь дойдет до нашей задачи - начнется генерация отчета. Теперь перейдем к реализации.

Создание асинхронной задачи начинается с определения функции, которая будет выполняться асинхронно. Для этого используем декоратор @task

import time
from celery import Celery

app = Celery('myapp', broker='pyamqp://guest@localhost//')

@app.task
def generate_report_task(arg1, arg2):
    print("Start generating report")
    time.sleep(10)
    print("Report generated")

Осталось только запустить. Есть три способа это сделать: apply_async, delay и обычный вызов call.

apply_async - это метод, который предоставляет максимальную гибкость при запуске задачи и принимает большое количество аргументов.

generate_report_task.apply_async(args=[arg1_value], kwargs={'key': 'value'})

delay - в отличие от apply_async имеет ограниченный список принимаемых аргументов. Такой способ запуска мы рассматриваем, когда нужно просто запустить задачу без необходимости передавать именованные аргументы и другие параметры.

generate_report_task.delay(arg1_value, arg2_value)

Этот метод часто используется, когда задача принимает всего несколько аргументов и нам нужно просто её запустить.

Последний способ - это обычный вызов функции. В таком случае задача будет выполнена сразу же, а не назначена в очередь.

generate_report_task(arg1_value, arg2_value)

Сценарий 2: Выполнить задачу через час

Следующая задача - пользователь создал статью и хочет опубликовать её через один час. Настало время узнать об аргументах, которые принимает apply_async.

Вариантов здесь - два. Самый простой - аргумент countdown - в переводе "обратный отсчёт". Он позволяет задать время в секундах, через которое задача станет доступна для выполнения. Как раз то, что нам нужно!

from datetime import datetime

@app.task
def publish_article(arg1, arg2):
    print(f"Publish time: {datetime.now()}")

publish_article_after = 60 * 60 # 60 минут
result = publish_article.apply_async(args=[article_id], countdown=publish_article_after)
Важно для Redis Backend

Данный способ не подойдет, если вы используете Redis в качестве брокера. Дело в том, что Redis помещает отложенные задачи в очередьunacked, из которой по истечение времени, указанного в аргументе VISIBILITY_TIMEOUT, задача будет назначена еще одному обработчику. Например, countdown у нас равен 120 минутам, а VISIBILITY_TIMEOUT по умолчанию 60. В таком случае есть риск, что задача будет назначена сразу трём обработчикам (первому сразу, второму через 60 минут, третьему - если задача через 120 минут будет еще в очереди). В результате, мы получим выполнение одной и той же задачи несколько раз. Подробнее в документации тут и тут.

Важно для RabbitMQ Backend

Параметрconsumer_timeout по умолчанию равен 30 минутам. Не желательно устанавливать countdown больше этого времени, иначе будет возбуждено исключение PRECONDITION_FAILED. Если есть такая необходимость, необходимо увеличить время в rabbitmq.conf. Подробнее - тут.

Сценарий 3: Выполнить задачу завтра в полдень

Теперь наш пользователь хочет выложить статью завтра в полдень. Эта ситуация очень похожа на предыдущую и мы могли бы использовать countdown. Но он лучше подходит для небольших промежутков времени - через минуту или пол часа. А для назначения задачи на конкретное время намного удобнее использовать аргумент eta. Он расшифровывается как Estimated Time of Arrival, что в переводе "Ожидаемое время прибытия".

Здесь есть две важных детали:

  • при использовании Redis отложенные с помощью eta задачи столкнутся с той же проблемой, что и countdown из-за VISIBILITY_TIMEOUT.

  • eta - это не точное время, в которое будет выполнена задача. Указывая время, мы говорим Celery - "задача должна быть выполнена не раньше этого времени". Как только это время наступит - задача будет выполнена в порядке очереди и будет зависеть от количества задач в очереди.

Вот пример:

from datetime import datetime

# Получим время для примера. В нормальной ситуации - 
# нам придет аргумент с временем публикации
now = datetime.now()
tomorrow = now + timedelta(days=1)

publish_article_datetime = datetime(tomorrow.year, tomorrow.month, tomorrow.day, 12, 0, 0)

result = publish_article.apply_async(args=["some_value"], eta=publish_article_datetime)

Сценарий 4: Выставить максимальное время выполнения задачи

Разберем на примере задачи по генерации отчёта. Мы знаем, что она не должна занимать больше часа (для примера). Если такое вдруг случилось - скорее всего что-то не так. Необходимо завершить задачу по таймауту, а потом - разобраться в причинах.

Для этого мы будем использовать аргументы soft_time_limit и time_limit. После наступления soft_time_limit в задаче будет возбуждено исключение SoftTimeLimitExceeded. Если задача не завершилась и наступает time_limit, выполнение задачи будет приостановлено.

Первый вариант - передать аргументы в apply_async

@app.task()

def generate_report():

    try:
        time.sleep(60 * 2)
    except SoftTimeLimitExceeded:
        print("Soft time limit exception")
        time.sleep(60 * 3)

soft_time_limit = 60 * 1

hard_time_limit = 60 * 2

result = my_task.apply_async(args=[some_value], soft_time_limit=soft_time_limit, time_limit=hard_time_limit )

Второй вариант - сразу указать ограничения в аргументах декоратора.

@app.task(time_limit=60 * 60, soft_time_limit=59 * 60) # 60/59 min
def generate_report():
    try:
        time.sleep(60 * 2)
    except SoftTimeLimitExceeded:
        print("Soft time limit exception")
        time.sleep(60 * 3)

result = my_task.apply_async(args=[some_value])

В результате, после запуска задачи через одну минуту мы увидим в консоли "Soft time limit exception", а еще через минуту задача будет принудительно завершена.

Сценарий 5: Отмена выполнения задачи по истечение времени

Возможна ситуация, когда определенная задача теряет свою актуальность, если не выполнена в течение какого то времени. Снова рассмотрим на примере генерации отчёта. Пользователь отправил запрос на генерацию отчёта. Задача попала в очередь и за час ни один обработчик не смог её обработать. В таком случае нам нужно отменить "просроченную" задачу. Для этого применяется аргументexpires. Он принимает либо число в секундах, либо объект datetime.

Например:

# Генерация будет отменена через час
generate_report.apply_async((10, 10), expires=3600)

# Генерация будет отменена, если через день задача не начнет выполняться
from datetime import datetime, timedelta, timezone
generate_report.apply_async((10, 10), kwargs,
                expires=datetime.now(timezone.utc) + timedelta(days=1))

Сценарий 6: Повторное выполнение задачи при возникновении ошибки

По какой-то причине в задаче может возникнуть ошибка. В таком случае нам может понадобится механизм повторного выполнения (retry).

Для начала разберем аргументы, которые помогут нам с детальной настройкой. Декоратор @app.task принимает:

  • default_retry_delay[int]: время до следующей попытки в секундах.

  • max_retries[int]: максимальное количество попыток.

  • autoretry_for[list | tuple]: Принимает список или кортеж с исключениями. Автоматический повтор при возникновении ошибки из переданного списка.

  • retry_backoff[bool|int]:при включении, задержка будет расти экспотенциально. Первая повторная попытка будет иметь задержку 1 секунду, вторая повторная попытка будет иметь задержку 2 секунды, третья будет иметь задержку в 4 секунды, четвертая будет иметь задержку в 8 секунд,

  • retry_backoff_max[int]: устанавливает максимальную задержку в секундах. Рекомендуется использовать всегда при использовании retry_backoff, чтобы избежать слишком больших задержек.

  • retry_jitter[bool]: задает случайную задержку. Принцип расчета new_num_of_seconds = random.randrange(retry_backoff + 1) . Соответственно время задержки будет случайным числом от 0 до retry_backoff

Теперь перейдем к коду. Есть два основных способа задействовать механизм retry.
Первый - использовать метод .retry(). Мы можем вызывать его по какому-либо условию.

@app.task(default_retry_delay=30, max_retries=3) # 60/59 min
def generate_report():
    some_condition = some_logic()
    if some_condition:
      generate_report.retry()

result = my_task.apply_async(args=[some_value])

Второй способ - это передать список ошибок при которых нужно выполнить задачу повторно.

@celery_app.task(autoretry_for=(GenerateReportError, SaveReportError, ), default_retry_delay=30,  max_retries=5)
def generate_report():
  ...

Заключение

Отлично! Мы с вами рассмотрели шесть основных сценариев. Надеюсь, что это послужит крепкой основой для ваших будущих задач и дальнейшего погружения в Celery.

Стоит отметить, что в этой статье мы не касались вопросов, связанных с периодическими задачами и использованием celery beat. Это широкая тема, которая заслуживает отдельной статьи, и я надеюсь подробно рассмотреть ее в будущем материале.

Комментарии (4)


  1. MAXH0
    26.10.2023 07:41
    +3

    Вот возникает у меня впечатление, что каждый кто закончил курсы вайтишников желает еще вайти на Хабр. И приносит маленькие крошки остаточных знаний в свои статьи.

    Не обижайтесь, но где новизна?


    1. elchako
      26.10.2023 07:41

      Вижу на хабре не любят статьи для новичков. Но и вы когда-то не знали celery. Насколько знаю хабр платформа для всех, без ограничений в опыте?

      Если ошибаюсь поправьте


  1. Nireko
    26.10.2023 07:41

    Причем тут джанго? Тема не раскрыта) интересно было бы про модули celery, которые вне доки, примеры глянуть


  1. steqa
    26.10.2023 07:41

    А что использовать вместо redis