Введение

Это продолжение цикла статей про Celery. Первая часть тут.
Сегодня мы более глубоко разберем работу с Celery. Узнаем как работает Celery Beat внутри, научимся настраивать и создавать периодические задачи. И конечно же коснемся практических вопросов.

Кто такой этот ваш Celery Beat?

Роль Celery Beat проста: это планировщик и он запускает задачи с установленными интервалами. Эти задачи затем выполняются доступными рабочими узлами.
Для начала нам нужно разобраться с основными компонентами и названиями, которые будут использоваться в дальнейшем.

Основные компоненты:

  1. Планировщик (Scheduler): Этот компонент отвечает за управление периодическими задачами. Он проверяет расписание и отправляет задачи в очередь в нужное время.

  2. Рабочие узлы (Worker Nodes): Эти узлы забирают задачи из очереди и выполняют их. Каждый узел может обрабатывать множество задач параллельно, что позволяет эффективно использовать ресурсы системы.

  3. Посредник (Broker): Для этой роли обычно используют RabbitMQ или Redis. Они используются для передачи сообщений между планировщиком и рабочими узлами. Брокер берет на себя доставку сообщений и управление очередями.

Принцип работы

Главная часть периодических задач - планировщик. По умолчанию используется celery.beat.PersistentScheduler. Сейчас посмотрим на то, как он работает внутри:

while not self._is_shutdown.is_set():
    interval = self.scheduler.tick()  # получаем следующее время проверки
    if interval and interval > 0.0:  # при наличии интервала, который больше 0, засыпаем на этот интервал
        time.sleep(interval)
        if self.scheduler.should_sync():  # проверяем пора ли синхронизировать расписание
            self.scheduler._do_sync()  # синхронизируем расписание

Упрощенно всё выглядит так:

Основная логика построена вокруг цикла, внутри которого выполняются "тики". Они возвращают интервал до следующего "тика".
Упрощённо функция tick() выглядит так:

tick()
def tick(self):    
    adjust = self.adjust

    # максимальный интервал между тиками
    max_interval = self.max_interval

    # здесь хранятся задачи
    H = self._heap

    # нет задач в куче - возвращаем максимальный интервал между тиками
    if not H:
        return max_interval

    event = H[0]
    entry = event[2]
    # Проверяем,пришло ли время выполнить задачу (is_due) и когда запусить в следующий раз
    is_due, next_time_to_run = self.is_due(entry)
    
    if is_due:
        self.apply_entry(entry, producer=self.producer)
        # Добавляем задачу обратно в кучу с обновленным временем запуска
        heappush(H, event_t(self._when(next_entry, next_time_to_run), event[1], next_entry))
        # возвращаем нулевой интервал, чтобы сразу же запустить новый "тик"
        return 0
    
    # Корректируем время следующего запуска
    adjusted_next_time_to_run = adjust(next_time_to_run)
    # Возвращаем минимальную задержку до следующего тика
    return min(adjusted_next_time_to_run if is_numeric_value(adjusted_next_time_to_run) else max_interval, max_interval)

Давайте кратко разберем, что там происходит.
tick() должен:

  1. Проверить актуальность расписания, при необходимости обновить список задач

  2. Взять ближайшую задачу

  3. Проверить, когда задачу нужно выполнить

  4. Если время выполнения задачи пришло, то отправить в очередь и вернуть нулевой интервал

  5. Если время выполнения задачи еще не наступило, вернуть интервал до следующего тика

На интервалы напрямую влияет beat_scheduler_max_interval. Интервал между тиками не может быть больше, чем это значение. Это же значение будет использовано, когда нет доступных задач.

Примечание из документации

По умолчанию это значение зависит от планировщика. Для планировщика Celery beat по умолчанию значение равно 300 (5 минут), но для планировщика https://pypi.org/project/django-celery-beat/ database это значение равно 5 секундам, поскольку расписание может быть изменено извне, и поэтому оно должно учитывать изменения в расписании.

За синхронизацию расписания отвечает функция do_sync() по условию should_sync():

    def should_sync(self):
        return (
            (not self._last_sync or
             (time.monotonic() - self._last_sync) > self.sync_every) or
            (self.sync_every_tasks and
             self._tasks_since_sync >= self.sync_every_tasks)
        )

Здесь видим две переменные, которые можно настроить. Это sync_every и sync_every_tasks. С помощью sync_every настраивается интервал синхронизации расписания в секундах. А sync_every_tasks позволяет указать синхронизацию каждые n задач. К примеру, "синхронизируй расписание каждые пять выполненных задач".

Так же стоит отдельно отметить, что задачи в очередь поступают по мере наступления "времени выполнения". Если задача должна выполняться каждые 10 секунд, она поступит в очередь только через 10 секунд, а не заранее. Нет никакой гарантии, что в этот момент будет свободный узел, который сможет сразу же начать выполнение. Так что определенный сдвиг во времени выполнения практически неизбежен.

Мы можем использовать стандартный планировщик или подключить другой. Вот два сторонних планировщика для примера:

  • DatabaseScheduler из django-celery-beat. Хранит расписание в базе данных.

  • RedBeatScheduler из RedBeat. Хранит расписание в Redis.

Теперь мы знаем как синхронизируется расписание и как именно задача попадает в очередь. Что происходит дальше?
Теперь в дело вступают рабочие узлы (workers). Они опрашивают очередь. При поступлении туда задачи - начинают выполнение.

Настройка

Здесь мы рассмотрим из чего обычно состоят настройки при использовании периодических задач.

import logging
import time
from datetime import timedelta
from celery import Celery
from celery.signals import after_setup_logger

logger = logging.getLogger(__name__)

app = Celery('tasks', broker='pyamqp://guest@localhost//')

app.conf.update(
    task_serializer='json',
    accept_content=['json'],
    timezone='UTC',
    enable_utc=True,
    worker_hijack_root_logger=False,
)

app.autodiscover_tasks()


@after_setup_logger.connect
def setup_loggers(logger, *args, **kwargs):
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    fh = logging.FileHandler('logs.log')
    fh.setFormatter(formatter)
    logger.addHandler(fh)


@app.task
def test():
    logger.info("It is working!")
    return True


app.conf.beat_schedule = {
    "test": {
        "task": 'celery_app.test',
        "schedule": timedelta(seconds=10),
    },
}

Общие настройки:

app.conf.update(
    timezone='Europe/Moscow', 
    enable_utc=True, # Храним время в UTC
    worker_hijack_root_logger=False, # Переопределяем настройки логгирования
)

Отдельно выделю worker_hijack_root_logger - всегда выставляю в False. Это позволяет использовать собственные настройки логирования. Без этого параметра большинство логов будет скрыто.

С полным списком доступных настроек со всеми подробностями можно ознакомится здесь.

Настройки логирования

@after_setup_logger.connect
def setup_loggers(logger, *args, **kwargs):
    formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
    fh = logging.FileHandler('logs.log')
    fh.setFormatter(formatter)
    logger.addHandler(fh)

Здесь указываем любые настройки, которые нам нужны. Для примера оставил настройку FileHandler для записи логов в файл. Логгер мы настраиваем по сигналу after_setup_logger.

Настройка расписания

app.conf.beat_schedule = {
    "test": {  # уникальное название задачи
        "task": 'celery_app.test',  # путь к задаче
        "schedule": timedelta(seconds=10),  # интервал, через который будет выполняться задача
    },
}

Здесь в beat_schedule мы передаем словарь с настройками расписания для задач.

Полный список возможных параметров
  • task: Имя задачи в формате строки. Например, 'celery_app.test'

  • schedule: Объект, определяющий расписание выполнения задачи. Например, timedelta(seconds=10), crontab(minute='*/5').

  • args: Список или кортеж с позиционными аргументами для задачи. Например, (1, 2, 3).

  • kwargs: Словарь с именованными аргументами для задачи. Например, {"foo": "bar"}.

  • options: Словарь с дополнительными параметрами выполнения задачи. Принимает всё, что поддерживает apply_async(). Например, {"queue": "default", "priority": 10}.

  • relative: Флаг, указывающий на использование относительного расписания. Например, True.

В schedule мы передаем объект timedelta. Это основной способ, с помощью которого мы будем указывать временной интервал для задач. Его альтернатива - crontab. С ним бы было вот так:

app.conf.beat_schedule = {
    "test": {
        "task": 'celery_app.test',  # путь к задаче
        'schedule': crontab(hour=8, minute=0),  # Ежедневно в 8 утра
}

Запуск периодических задач

Теперь пора перейти непосредственно к запуску. У нас есть возможность использовать две разных команды:

  1. celery -A celery_app worker -B --loglevel=INFO

  2. celery -A celery_app beat --loglevel=INFO

Первая команда запускает рабочий узел, который одновременно будет являться и планировщиком. Эта команда лучше всего подходит для отладки и не рекомендуется для запуска в production среде. Дело в том, что в этом случае на работу планировщика могут повлиять выполняемые задачи, что может привести к сбоям.

Вторая команда запускает только планировщик. В таком случае он занимается только назначением задач в нужную очередь и не занимается выполнением задач. Такая схема работы более надёжна. Для того, чтобы задачи начали выполняться, нам понадобится запустить worker отдельно.

Сам рабочий узел мы будем запускать с помощью команды:

celery -A celery_app worker

Запускаем beat в первом терминале, worker во втором.
В логах beat мы видим следующее:

celery beat v5.3.6 (emerald-rush) is starting.
__    -    ... __   -        _
LocalTime -> 2024-06-05 23:23:34
Configuration ->
    . broker -> amqp://user:**@localhost:5672//
    . loader -> celery.loaders.app.AppLoader
    . scheduler -> celery.beat.PersistentScheduler
    . db -> celerybeat-schedule
    . logfile -> [stderr]@%INFO
    . maxinterval -> 5.00 minutes (300s)
[23:23:34] INFO beat beat: Starting...
[23:23:39] INFO beat Scheduler: Sending due task test (celery_app.test)
[23:23:44] INFO beat Scheduler: Sending due task test (celery_app.test)
[23:23:49] INFO beat Scheduler: Sending due task test (celery_app.test)

В логах worker мы увидим:

 
 -------------- celery@kisel-manjaro v5.3.6 (emerald-rush)
--- ***** ----- 
-- ******* ---- Linux-6.8.11-1-MANJARO-x86_64-with-glibc2.39 2024-06-05 23:25:00
- *** --- * --- 
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x7fd14bd0ff80
- ** ---------- .> transport:   amqp://user:**@localhost:5672//
- ** ---------- .> results:     disabled://
- *** --- * --- .> concurrency: 12 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery
                

[tasks]
  . celery_app.test

[23:25:00] INFO connection Connected to amqp://user:**@127.0.0.1:5672//
[23:25:00] INFO mingle mingle: searching for neighbors
[23:25:01] INFO mingle mingle: all alone
[23:25:01] INFO worker celery@kisel-manjaro ready.
[23:25:13] INFO strategy Task celery_app.test[e85e31ba-0735-476a-8896-218b9ec55288] received
[23:25:13] WARNING log It is working!
[23:25:13] INFO trace Task celery_app.test[e85e31ba-0735-476a-8896-218b9ec55288] succeeded in 0.00033264300145674497s: None
[23:25:18] INFO strategy Task celery_app.test[3501968d-964c-4663-9106-0f81c96c4032] received
[23:25:18] WARNING log It is working!
[23:25:18] INFO trace Task celery_app.test[3501968d-964c-4663-9106-0f81c96c4032] succeeded in 0.0001917649933602661s: None
[23:25:23] INFO strategy Task celery_app.test[88ddedd2-b2fb-4ae4-beaf-31091ae98337] received
[23:25:23] WARNING log It is working!
[23:25:23] INFO trace Task celery_app.test[88ddedd2-b2fb-4ae4-beaf-31091ae98337] succeeded in 0.00017273900448344648s: None

Супер, это всё, что нужно знать для уверенной работы с Celery.
Напоследок разберем несколько практических вопросов для закрепления.

Как выполнять только одну периодическую задачу за раз?

Наиболее популярный способ - использовать блокировку и проверять её перед началом выполнения каждой задачи. Пример для наглядности:

from celery import Celery
import time
import redis

app = Celery('tasks', broker='pyamqp://guest@localhost//')
r = redis.Redis()


@app.task(bind=True)
def lonely_task(self):
    lock_id = "lonely_task_lock"
    have_lock = False
    lock = r.lock(lock_id, timeout=60)
    try:
        have_lock = lock.acquire(blocking=False)
        if have_lock:
            print("Task is running")
            time.sleep(30)  # выполнение долгой задачи
        else:
            print("Task is already running")
    finally:
        if have_lock:
            lock.release()


app.conf.beat_schedule = {
    'lonely_task': {
        'task': 'celery_app.lonely_task',
        'schedule': timedelta(seconds=15),
    },
}

Как распределить задачи по воркерам?

Для этого необходимо дополнительно обновить task_routes.

    app.conf.task_routes={
        'celery_app.task_A': {'queue': 'queue_A'},
        'celery_app.task_B': {'queue': 'queue_B'},
    }

Далее, если мы хотим обрабатывать эти задачи в разных воркерах, то запускаем их разными командами:

Этот worker будет брать задачи только из очереди queue_A

celery -A celery_app worker -l INFO -Q queue_A

Этот воркер будет брать задачи только из очереди queue_B

celery -A celery_app worker -l INFO -Q queue_B

При явном указании очереди worker перестанет брать задачи из очереди "по умолчанию".

Что же будет, если задачи будут назначаться быстрее, чем воркеры будут их выполнять?

Ничего хорошего. Если у задач не установлен параметр expire, а у используемой очереди отсутствует лимит на количество задач, то очередь из задач будет копиться. В конечном итоге закончится оперативная память и в лучшем случае отвалится только наш брокер.
Если есть риск, что задачи начнут "тупить", то есть несколько вариантов:

  • увеличиваем период, через который они будут выполняться (timedelta)

  • устанавливаем expire на задачи

  • устанавливаем лимит на очередь в брокере

Что если выполнение задач занимает разное время и нужно выполнить их все?

Здесь на помощь приходит параметр autoscale.
Если задачи имеют разное время выполнения и невозможно точно предсказать нагрузку, то автоматическое маштабирование может помочь. Здесь всё будет ограничено количеством ядер. Цифру больше числа доступных ядер указывать нет смысла.
Вот пример, как указать маштабирование в настройках:

app.conf.update(
    worker_autoscaler={
        'max_concurrency': 10,
        'min_concurrency': 3,
    }
)

Так же можно установить этот параметр с помощью команды при запуске:

celery -A celery_app worker -B --loglevel=INFO --autoscale=3,10

Как лучше всего отслеживать выполнение периодических задач?

Лучший инструмент для мониторинга выполнения периодических задач для Celery - это Flower. Он легко подключается и дает очень подробную статистику по всем задачам.
Вот так выглядит интерфейс:

Интерфейс Flower
Интерфейс Flower

Второй способ - использование app.control.inspect(). С помощью него можно получить доступ к подробной информации о задачах прямо из кода.

Основные методы app.control.inspect()
  1. active():

    • Возвращает список задач, которые в настоящее время выполняются на рабочих узлах.

  2. scheduled():

    • Возвращает список задач, которые запланированы и находятся в очереди, но еще не начаты.

  3. reserved():

    • Возвращает список задач, которые зарезервированы рабочими узлами, но еще не выполняются.

  4. stats():

    • Возвращает статистику о рабочих узлах, такую как количество задач, производительность и использование ресурсов.

  5. ping():

    • Проверяет доступность рабочих узлов и возвращает время отклика.

  6. registered():

    • Возвращает список всех зарегистрированных задач на рабочих узлах.

  7. revoked():

    • Возвращает список отозванных задач.

  8. conf():

    • Возвращает текущую конфигурацию рабочих узлов.

  9. report():

    • Возвращает краткий отчет о текущем состоянии рабочих узлов.

Подробнее можно посмотреть вот здесь.

Подведем итоги

Мы детально разобрали принципы работы планировщика, многое узнали про настройки, запуск и разобрали несколько практических вопросов. Это крепкая основа, чтобы использовать Celery и понимать, что же там происходит. А когда есть понимание - любые проблемы получится решить.

P.S. Пишите ваши вопросы, кейсы и советы в комментарии. С радостью дополню статью, чтобы помочь начинающим и продолжающим знакомство с Celery сэкономить драгоценное время в решении проблем.

Комментарии (0)