В этой статье рассмотрим возможность получать метрики Celery непосредственно от самих воркеров, хитрости, на которые придётся пойти, чтобы решить эту задачу, и, самое главное, какие преимущества от этого можно получить по сравнению с классическим подходом к мониторингу Celery. Также продемонстрирую небольшой Django-проект и пример его конфигурации. Особое внимание будет уделено режиму мультипроцессинга и тому, как та или иная конфигурация запуска Celery будет влиять на сложность решения.

Оглавление

В предыдущей статье рассматривались особенности популярного подхода к мониторингу в Celery. Вкратце напомню, о чём речь. В рамках жизненного цикла обработки задач воркеры отправляют события: задача принята в работу, произошла ошибка или задача успешно завершена. По другую сторону брокера есть consumer, который слушает все события, агрегирует информацию в своё внутреннее состояние и предоставляет endpoint для отдачи метрик.

Рисунок 1. Архитектура сбора метрик на основе событий
Рисунок 1. Архитектура сбора метрик на основе событий

Преимущества этого решения очевидны:

  • Лёгкая настройка — достаточно включить отправку событий

  • Есть готовые решения, например, Flower

Но есть и недостатки: 

  • События создают дополнительную нагрузку на брокер 

  • Обработка задач занимает больше времени 

  • Необходимо несколько раз отправить по сети служебные события 

  • Единая точка отказа в виде приложения, которое слушает эти события (горизонтально масштабировать его нельзя) 

  • У готовых решений есть свои особенности реализации. Например, Flower больше подходит для мониторинга фиксированного количества воркеров со статическими именами. В противном случае вы столкнётесь с классической проблемой push-модели: хранением старых метрик. Не менее интересный вопрос возникает, когда нужно мониторить несколько разных проектов: придётся либо запускать под каждый проект экземпляр Flower, либо строить своё решение.

Избавиться от этих недостатков поможет смена подхода к мониторингу. Если организовать pull-модель, то перечисленные проблемы уйдут естественным образом, а также откроются функциональные возможности. Помимо метрик задач, становятся доступны внутренние метрики приложения, например, метрики подключений к БД, кешу, метрики интеграций с внешними сервисами и другие, которые вы будете наблюдать в своём приложении.

Другими словами, pull-модель подразумевает, что приложение самостоятельно собирает метрики и предоставляет endpoint для их отдачи. И какие метрики войдут в состав ответа, зависит только от реализации самого приложения.

Рисунок 2. Pull-модель сбора метрик
Рисунок 2. Pull-модель сбора метрик

Метрики обработки задач

Далее будут встречаться упрощённые примеры из библиотеки celery_control. Полный код размещён в официальном репозитории Домклик на платформе GitVerse.

Для начала нам потребуется некоторый общий объект состояния. Он будет содержать в себе информацию и ссылки на объекты, необходимые для дальнейшего сбора метрик. В первую очередь, нужно сохранить worker_name. Его значение будет использоваться в метках.

# celery_control/conf.py
from typing import Generic, Type, TypeVar

DescriptorType = TypeVar('DescriptorType')

class Descriptor(Generic[DescriptorType]):
  
    def __init__(self) -> None:
        self.name: str = ''
        self._value: DescriptorType

    def __get__(self, instance: 'State', owner: Type['State']) -> DescriptorType:
        try:
            return self._value
        except AttributeError as error:
            raise RuntimeError(f'{self.name} is not set') from error
            
    def __set__(self, instance: 'State', value: DescriptorType) -> None:
        self._value = value

    def __set_name__(self, owner: Type['State'], name: str) -> None:
        self.name = name
        
class State:
    worker_name = Descriptor[str]()

state = State()

Инициализировать имя воркера можно через сигнал worker-init. Он отправляется в главном процессе воркера в момент инициализации приложения.

# celery_control/receivers.py

from celery import signals
from celery.apps.worker import Worker

from celery_control.conf import state

@signals.worker_init.connect
def receive_worker_init(sender: Worker, **kwargs):
    state.worker_name = sender.hostname

Для сбора метрик нам поможет набор сигналов обработки задач. Вот некоторые из них:

  • task_prerun

  • task_postrun

  • task_received

  • task_success

  • task_failure

На их основе можно сделать счётчик событий по аналогии с flower_events_total. Но лично мне ближе подход, когда под каждый тип события будет собственная метрика. Это позволит для некоторых событий добавлять метки, свойственные только им, или же отказаться от части меток из-за ненадобности. Такой пример мы тоже встретим.

Далее рассмотрим подробности реализации по каждой метрике.

Счётчик успешных задач

После успешного завершения задачи отправляется сигнал task_success. Здесь нам впервые пригодится сохранённое ранее worker_name.

# celery_control/metrics.py

task_succeeded_total = Counter(
    name='celery_task_succeeded',
    documentation='Number of tasks successes.',
    labelnames=['worker', 'task'],
)
# celery_control/receivers.py

from celery import signals
from celery.app.task import Task

from celery_control import metrics
from celery_control.conf import state

@signals.task_success.connect
def receive_task_success(sender: Task, **kwargs):
    metrics.task_succeeded_total.labels(
        worker=state.worker_name,
        task=sender.name,
    ).inc()

Счётчик ошибок

Считается, что задача завершилась с ошибкой, когда в ней появляется необработанное исключение. Для мониторинга будет полезно отслеживать типы исключений, поэтому в счётчик ошибок добавим метку exception.

# celery_control/metrics.py

task_failed_total = Counter(
    name='celery_task_failed',
    documentation='Number of tasks failures.',
    labelnames=['worker', 'task', 'exception'],
)
# celery_control/receivers.py

@signals.task_failure.connect
def receive_task_failure(sender: Task, exception: Exception, **kwargs):
    exception_type_name = type(exception).__name__
    
    metrics.task_failed_total.labels(
        worker=state.worker_name,
        task=sender.name,
        exception=exception_type_name,
    ).inc()

Счётчик повторных попыток

Похожим образом реализуется счётчик повторных попыток. В зависимости от ситуации, в аргументах может прийти параметр reason, в котором содержится исключение, ставшее причиной повторной попытки.

# celery_control/metrics.py

task_retried_total = Counter(
    name='celery_task_retried',
    documentation='Number of tasks retries.',
    labelnames=['worker', 'task', 'exception'],
)
# celery_control/receivers.py

@signals.task_retry.connect
def receive_task_retry(sender: Task, **kwargs):
    exception_type_name = 'undefined'
    reason: Retry | None = kwargs.get('reason')
    
    if reason and reason.exc:
        exception_type_name = type(reason.exc).__name__

    metrics.task_retried_total.labels(
        worker=state.worker_name,
        task=sender.name,
        exception=exception_type_name,
    ).inc()

Счётчик отозванных задач

Задача считается отозванной в двух случаях:

  • Отозвали через remote control

  • Истекло время, в течение которого можно было взять задачу в работу

Эти случаи также будут отображены в соответствующих метках счётчика.

# celery_control/metrics.py

task_revoked_total = Counter(
    name='celery_task_revoked',
    documentation='Number of revoked tasks.',
    labelnames=['worker', 'task', 'terminated', 'expired'],
)
# celery_control/receivers.py

@signals.task_revoked.connect
def receive_task_revoked(sender: Task, terminated: bool, expired: bool, **kwargs):
    metrics.task_revoked_total.labels(
        worker=state.worker_name,
        task=sender.name,
        terminated=str(terminated).lower(),
        expired=str(expired).lower(),
    ).inc()

Счётчик неизвестных задач

Это тот самый пример, когда стоит отказаться от некоторых меток, в частности, от метки task. В противном случае появляется риск бесконтрольного роста метрик, когда на каждую такую задачу будет создан новый экземпляр счётчика. Большое количество уникальных неизвестных задач создаст лишнюю нагрузку не только на приложение, но и на систему мониторинга, которая будет строить временные ряды по каждой неизвестной задаче.

# celery_control/metrics.py

task_unknown_total = Counter(
    name='celery_task_unknown',
    documentation='Number of unknown tasks.',
    labelnames=['worker'],
)
# celery_control/receivers.py

@signals.task_unknown.connect
def receive_task_unknown(**kwargs):
    metrics.task_unknown_total.labels(
        worker=state.worker_name,
    ).inc()

Счётчик принятых в работу задач

Его необходимо увеличивать непосредственно перед самим выполнением задачи. Для этого поможет сигнал task_prerun.

# celery_control/metrics.py

task_accepted_total = Counter(
    name='celery_task_accepted',
    documentation='Number of accepted tasks.',
    labelnames=['worker', 'task'],
)
# celery_control/receivers.py

@signals.task_prerun.connect
def receive_task_prerun(sender: Task, **kwargs):
    metrics.task_accepted_total.labels(
        worker=state.worker_name,
        task=sender.name,
    ).inc()

Счётчик отклонённых задач

Задача считается отклонённой, если она выбрасывает исключение celery.exceptions.Reject. Логика и условия отклонения разработчик прописывает самостоятельно в исходном коде обработки задачи.

Стоит упомянуть, что вам доступен параметр requeue. При его взведении задача снова становится в очередь. Но использовать его следует с осторожностью, поскольку это может привести к бесконечному циклу сообщений. Также помните, что за отклонённые задачи отвечает сигнал task_postrun.

# celery_control/metrics.py

task_rejected_total = Counter(
    name='celery_task_rejected',
    documentation='Number of tasks rejections.',
    labelnames=['worker', 'task', 'requeue'],
)
# celery_control/receivers.py

from celery import signals
from celery.app.task import Task
from celery import states as task_states
from celery.exceptions import Reject

from celery_control import metrics
from celery_control.conf import state

@signals.task_postrun.connect
def receive_task_postrun(sender: Task, **kwargs):
    task_state, task_retval = kwargs.get('state'), kwargs.get('retval')

    if task_state == task_states.REJECTED:
        requeue = 'unknown'
        if isinstance(task_retval, Reject):
            requeue = str(task_retval.requeue).lower()

        metrics.task_rejected_total.labels(
            worker=state.worker_name,
            task=sender.name,
            requeue=requeue,
        ).inc()

На мой взгляд, в терминологии есть некоторая путаница. Например, есть задачи отозванные (revoke), а есть отклонённые (reject). Названия немного похожи, но смысл разный. Отзывают задачи либо по дате истечения, либо вручную через remote control, а отклоняют программно в самой задаче исключением. Причём можно просто отклонить, а можно снова поставить задачу в очередь, что очень похоже на процесс повторных попыток (retry).

Счётчик отклонённых сообщений

Ещё больше путаницы вносят названия сигналов. Можно предположить, что сигнал task_rejected отвечает за отклонение задачи, однако он отправляется в том случае, когда Consumer получил сообщение неизвестного типа.

# celery_control/metrics.py

message_rejected_total = Counter(
    name='celery_message_rejected',
    documentation='Number of message rejections.',
    labelnames=['worker'],
)
# celery_control/receivers.py

@signals.task_rejected.connect
def receive_task_rejected(**kwargs):
    metrics.message_rejected_total.labels(
        worker=state.worker_name,
    ).inc()

Счётчик внутренних ошибок Celery

Celery отслеживает свои внутренние ошибки, возникающие во время работы, и предоставляет соответствующий сигнал task_internal_error.

# celery_control/metrics.py

task_internal_error_total = Counter(
    name='celery_task_internal_errors',
    documentation='Number of tasks internal errors.',
    labelnames=['worker', 'task', 'exception'],
)
# celery_control/receivers.py

@signals.task_internal_error.connect
def receive_task_internal_error(sender: Task, exception: Exception, **kwargs):
    exception_type_name = type(exception).__name__

    metrics.task_internal_error_total.labels(
        worker=state.worker_name,
        task=sender.name,
        exception=exception_type_name,
    ).inc()

Гистограмма длительности выполнения

Celery самостоятельно измеряет длительность выполнения задачи. Однако получить доступ к результатам практически невозможно. Поэтому пойдём на хитрость: при обработке сигнала task_prerun сохраним время начала обработки в самом объекте запроса, а при получении сигнала task_postrun извлечём сохранённое время начала обработки, вычислим продолжительность и обновим гистограмму длительности выполнения.

# celery_control/metrics.py

from prometheus_client import Histogram

task_runtime_seconds = Histogram(
    name='celery_task_runtime_seconds',
    documentation='Task runtime.',
    labelnames=['worker', 'task'],
)
# celery_control/receivers.py

import time

from celery.app.task import Task

from celery_control import metrics
from celery_control.conf import state

@signals.task_prerun.connect
def receive_task_prerun(sender: Task, **kwargs):
    ...

    setattr(
        sender.request,
        'celery_control_task_time_start',
        time.monotonic(),
    )

@signals.task_postrun.connect
def receive_task_postrun(sender: Task, **kwargs):
    ...

    time_start = getattr(sender.request, 'celery_control_task_time_start')

    task_runtime_seconds = time.monotonic() - time_start

    metrics.task_runtime_seconds.labels(
        worker=state.worker_name,
        task=sender.name,
    ).observe(task_runtime_seconds)

Метрики состояния Celery

У Celery есть собственное состояние. Из него можно достать полезную информацию:

  • active_requests — запросы в процессе обработки

  • reserved_requests — все текущие запросы: они либо в ожидании свободного обработчика, либо выполняются в настоящий момент

Если вычесть первое множество из второго, то можно получить множество задач, ожидающих свободный обработчик.

# celery_control/snapshot.py

import dataclasses

from celery.worker.request import Request
from celery.worker.state import active_requests, reserved_requests

@dataclasses.dataclass
class RequestsSnapshot:
    prefetched: set[Request]
    active: set[Request]
    waiting: set[Request]
    scheduled: set[Request]

def make_requests_snapshot() -> RequestsSnapshot:
    # Все текущие запросы
    prefetched = set(reserved_requests)

    # Выполняются в данный момент
    active = set(active_requests)

    # Ожидают свободного обработчика
    waiting = prefetched - active

    # Запланированные по времени
    scheduled = set(_scheduled())

    return RequestsSnapshot(
        prefetched=prefetched,
        active=active,
        waiting=waiting,
        scheduled=scheduled,
    )

Множество отложенных запросов управляются классом Timer из пакета kombu . Чтобы получить доступ к его экземпляру, потребуется экземпляр класса Consumer. Ссылку на него можно захватить в сигнале worker_ready.

# celery_control/receivers.py

from celery import signals
from celery.worker.consumer import Consumer

from celery_control.conf import state

@signals.worker_ready.connect
def receive_worker_ready(sender: Consumer, **kwargs):
    state.consumer = sender

Внутреннее устройство Timer основано на очереди с приоритетом. В качестве реализации используется min-куча из модуля стандартной библиотеки heapq. Экземпляр Timer обладает свойством queue, которое делает копию внутренней структуры. И на примере обхода этой копии можно получить искомое множество отложенных запросов.

# celery_control/snapshot.py

from typing import Generator

from celery.worker.request import Request
from kombu.asynchronous.timer import Entry, Timer

def _scheduled() -> Generator[Request, None, None]:
    timer: Timer = state.consumer.timer

    for waiting in timer.schedule.queue:
        entry: Entry = waiting.entry
        try:
            arg0 = entry.args[0]
        except (IndexError, TypeError):
            continue
        else:
            if isinstance(arg0, Request):
                yield arg0

Для представленных множеств создадим соответствующие датчики.

from prometheus_client import Gauge

task_prefetched = Gauge(
    name='celery_task_prefetched',
    documentation='Number of tasks currently prefetched at a worker.',
    labelnames=['worker', 'task'],
    multiprocess_mode='livemostrecent',
)
task_waiting = Gauge(
    name='celery_task_waiting',
    documentation='Number of tasks currently waiting at a worker.',
    labelnames=['worker', 'task'],
    multiprocess_mode='livemostrecent',
)
task_active = Gauge(
    name='celery_task_active',
    documentation='Number of tasks currently active at a worker.',
    labelnames=['worker', 'task'],
    multiprocess_mode='livemostrecent',
)
task_scheduled = Gauge(
    name='celery_task_scheduled',
    documentation='Number of tasks currently scheduled at a worker.',
    labelnames=['worker', 'task'],
    multiprocess_mode='livemostrecent',
)

Далее необходимо реализовать функцию, вызов которой обновит их показания.

# celery_control/tracker.py

from collections import defaultdict

from celery.worker.request import Request
from celery.worker.state import total_count
from prometheus_client import Gauge

from celery_control import metrics, snapshot

def track():

    _fill_tasks_seen()

    rs = snapshot.make_requests_snapshot()

    _track(rs.prefetched, metrics.task_prefetched)
    _track(rs.active, metrics.task_active)
    _track(rs.waiting, metrics.task_waiting)
    _track(rs.scheduled, metrics.task_scheduled)

def _track(requests: set[Request], metric: Gauge):
    counter: dict[str, int] = defaultdict(int)

    for request in requests:
        counter[request.task.name] += 1

    for task, cnt in counter.items():
        metric.labels(
            worker=state.worker_name,
            task=task,
        ).set(cnt)
        state.tasks_seen.add(task)

    for task_seen in state.tasks_seen:
        if task_seen not in counter:
            metric.labels(
                worker=state.worker_name,
                task=task_seen,
            ).set(0)

def _fill_tasks_seen():
    # fill from accepted counter
    for task_accepted in total_count.keys():
        state.tasks_seen.add(task_accepted)

В отличие от счётчика и гистограммы, датчик не является накопительной метрикой. Он представляет собой текущее значение какого-либо показателя, например, температуры в больнице или уровня воды в Мировом океане. Показатели имеют свойство как увеличиваться, так и уменьшаться. Следовательно, нужно не только устанавливать датчикам значения по текущим задачам, но и сбрасывать значения тем метрикам, задачи которых в текущий snapshot не попали. Поэтому перед каждой обработкой необходимо запоминать, какие задачи мы уже встретили tasks_seen, а потом пройтись по ним, и для тех, кого нет в текущем состоянии, сбросить метрику.

Забегая вперёд, отмечу, что вызов этой функции необходимо разместить в том месте, где происходит подготовка данных к ответу, а именно в ручке сервера /metrics. Также может потребоваться периодический фоновый вызов в отдельном потоке, но обо всём по порядку.

Запуск сервера

Для получения метрик необходим соответствующий сервер. Предполагается, что сервер будет запускаться в отдельном потоке рядом с основным потоком воркера.

# celery_control/server.py

import threading

from wsgiref.simple_server import make_server

from prometheus_client import CONTENT_TYPE_LATEST, REGISTRY, generate_latest

def start_wsgi_server():
    server = make_server(host='', port=5000, app=app)

    thread = threading.Thread(target=server.serve_forever)
    thread.daemon = True

    thread.start()
  
def app(environ, start_response) -> list[bytes]:
    if environ['PATH_INFO'] == '/favicon.ico':
        status, output, headers = '200 OK', b'', []
    else:
        status, output, headers = _make_output()

    start_response(status, headers)

    return [output]

def _make_output():
    registry = REGISTRY

    output = generate_latest(registry)

    headers = [('Content-Type', CONTENT_TYPE_LATEST)]

    return '200 OK', output, headers,

Именно здесь перед генерированием ответа я расположил вызов функции track, которая актуализирует датчики каждый раз, когда будет вызов ручки /metrics.

# celery_control/server.py

from celery_control import tracker

def _make_output():
    tracker.track()
    ...

Для запуска самого сервера использую сигнал worker_ready. Ранее я его уже применял, чтобы сохранить ссылку на Consumer.

# celery_control/receivers.py

from celery import signals
from celery.worker.consumer import Consumer

from celery_control import server
from celery_control.conf import state

@signals.worker_ready.connect
def receive_worker_ready(sender: Consumer, **kwargs):
    state.consumer = sender

    server.start_wsgi_server()

Представленная схема будет отлично работать для реализации пула threads. Память у потоков обработчиков и у отдельного потока сервера будет общая. Другими словами, метрики, собранные обработчиками, также станут доступны серверу.

Рисунок 3. Схема запуска сервера для пула threads
Рисунок 3. Схема запуска сервера для пула threads

Однако этого недостаточно для реализации пула по умолчанию prefork. Обработчики будут отдельными процессами, и у каждого — собственная память. У потока сервера не будет доступа к метрикам, за исключением тех, что собраны в нём самом (вызов track).

Рисунок 4. Схема запуска сервера для пула prefork
Рисунок 4. Схема запуска сервера для пула prefork

Каждый форк будет отдельно обновлять собственные счётчики, гистограммы и т.д. Нужно организовать доступ к метрикам других процессов с последующим их агрегированием.

Похожая проблема уже описана в статье «Особенности сбора метрик. Запуск приложения gunicorn-ом в режиме мультипроцессинга». Поэтому здесь применю такую же технику. Потребуется включить у prometheus_client режим мультипроцессинга с помощью переменной окружения PROMETHEUS_MULTIPROC_DIR, а в функции make_output подменить реализацию registry.

# celery_control/server.py

from prometheus_client import (
    CONTENT_TYPE_LATEST,
    REGISTRY,
    CollectorRegistry,
    generate_latest,
)
from prometheus_client.multiprocess import MultiProcessCollector

from celery_control import tracker

def make_output():
    tracker.track()

    if os.environ.get('PROMETHEUS_MULTIPROC_DIR'):
        registry = CollectorRegistry()
        MultiProcessCollector(
            registry,
            path=os.environ.get('PROMETHEUS_MULTIPROC_DIR'),
        )
    else:
        registry = REGISTRY

    output = generate_latest(registry)

    headers = [('Content-Type', CONTENT_TYPE_LATEST)]

    return '200 OK', output, headers 

В директории PROMETHEUS_MULTIPROC_DIR будут лежать файлы метрик каждого процесса по отдельности. Во время обновления объектов метрик информация будет записываться на диск. Когда серверу потребуется отдать метрики, он прочитает все файлы всех процессов и объединит их в результат ответа.

Рисунок 5. Схема сбора метрик в режиме мультипроцессинга
Рисунок 5. Схема сбора метрик в режиме мультипроцессинга

Мультистарт

Ситуация усложняется, когда появляется необходимость в запуске нескольких воркеров в рамках одного окружения. Например, в рамках одной машины или контейнера. У Celery есть встроенная утилита, позволяющая управлять этими процессами. С её помощью можно одной командой запустить несколько воркеров и указать им соответствующие названия и настройки.

celery multi start image video -c 3
# celery worker -n image@myhost -c 3
# celery worker -n video@myhost -c 3

Если просто применить схему запуска, описанную выше, то получим гонку за доступный порт. При запуске первого воркера портом завладеет первый сервер, а при запуске второго этот же порт уже будет недоступен. Другими словами, второй сервер не сможет запуститься.

Эту проблему можно решить настройкой сервера на разные порты. В свою очередь, придётся настроить цели для Prometheus на этот режим работы. Такое решение будет работать для любой реализации пула обработчиков, как для prefork, так и для threads.

Рисунок 6. Схема запуска воркеров через multistart
Рисунок 6. Схема запуска воркеров через multistart

Однако не всегда есть возможность настроить получение метрик с разных портов. И вам может потребоваться относиться к запуску нескольких воркеров как к единому источнику метрик. В этой ситуации без режима мультипроцессинга не обойтись, причем независимо от реализации пула обработчиков. Каждый процесс или поток каждого воркера будет писать метрики в соответствующие им файлы, а запущенный сервер в единственном экземпляре будет читать файлы не только от своего воркера, но и от конкурирующего.

Рисунок 7. Схема ошибки запуска сервера в конкурирующем воркере
Рисунок 7. Схема ошибки запуска сервера в конкурирующем воркере

В схеме выше мы перестали учитывать датчики второго воркера. Напомню, что на данный момент функция track вызывается при генерировании ответа /metrics. Ввиду того, что в нашем наборе воркеров доступен только один сервер, датчики доступны только для воркера, в котором запущен поток сервера.

Эту проблему можно решить с помощью отдельного потока демона, который с заданным интервалом будет сохранять значения на диск, а поток сервера будет читать их вместе с остальными файлами.

Рисунок 8. Схема запуска фонового потока демона
Рисунок 8. Схема запуска фонового потока демона

На схеме выше в качестве примера выбран пул prefork . Но по такому же принципу будет работать и пул threads, отличие заключается только в количестве файлов метрик. При необходимости можно комбинировать пулы обработчиков.

Реализация такой функциональности достаточно проста:

# celery_control/tracker.py

import threading
import time

def start_daemon() -> None:
    thread = threading.Thread(target=_daemon)
    thread.daemon = True
    thread.start()

def _daemon() -> None:
    while True:
        track()

        time.sleep(5)

Запустить демон можно в том же событии worker_ready. Если порт занят, то сервер упадёт с ошибкой. Это можно трактовать как необходимость запуска.

# celery_control/receivers.py

from celery import signals
from celery.worker.consumer import Consumer

from celery_control import server, tracker
from celery_control.conf import state

@signals.worker_ready.connect
def receive_worker_ready(sender: Consumer, **kwargs):
    state.consumer = sender

    try:
        server.start_wsgi_server()
    except OSError:
        tracker.start_daemon()

Мультипроцессинг

Проблема роста файлов метрик

По умолчанию в качестве process_identifier используется PID. Когда процессы перезапускаются, PID принимает новое значение. Он, в свою очередь, используется в названии файлов метрик. Каждый процесс пишет метрики только в свои соответствующие файлы. Таким образом, при каждом перезапуске процесса появляется дополнительный набор файлов. Поэтому для процессов с относительно коротким жизненным циклом считается хорошей практикой ограничить количество таких наборов заменой функции получения идентификатора процесса.

# celery_control/multiprocess.py

from prometheus_client import values


def set_process_identifier(process_identifier: str):
    values.ValueClass = values.MultiProcessValue(
        process_identifier=lambda: process_identifier,
    )

В упомянутой ранее статье рассматривалось решение для сервера Gunicorn. Здесь у Celery такая же потребность идентифицировать обработчики. Поэтому применю тот же метод решения на основе filelock-ов.

# celery_control/receivers.py

from celery import signals
from celery.worker.consumer import Consumer

from celery_control import multiprocess
from celery_control.conf import state

@signals.worker_process_init.connect
def receive_worker_process_init(**kwargs):
    if not multiprocess.enabled:
        return

    worker_id, worker_id_lock = multiprocess.get_worker_id(
        concurrency=state.worker_concurrency,
        name=state.worker_name,
        path=multiprocess.prometheus_multiproc_path,
    )

    state.worker_id = worker_id
    state.worker_id_lock = worker_id_lock
    state.worker_process_identifier = multiprocess.make_process_identifier(
        worker_name=state.worker_name,
        worker_id=state.worker_id,
        has_lock=worker_id_lock is not None,
    )

    multiprocess.set_process_identifier(state.worker_process_identifier)

Суть решения заключается в том, что каждый процесс (форк) будет получать свободный lock на файл из фиксированного набора. А номер этого файла будет определять идентификатор соответствующего воркера. Это позволит ограничить количество наборов файлов числом форков worker_concurrency. Полную реализацию multiprocess.py можно посмотреть тут.

Не будет лишним подменить process_identifier и в основном процессе тоже. Здесь можно обойтись более простым решением — установить имя воркера.

# celery_control/receivers.py

from celery import signals
from celery.apps.worker import Worker

from celery_control import multiprocess
from celery_control.conf import state

@signals.worker_init.connect
def receive_worker_init(sender: Worker, **kwargs):
    state.worker_id = sender.hostname
    state.worker_id_lock = None
    state.worker_name = sender.hostname
    state.worker_concurrency = sender.concurrency
    state.worker_process_identifier = state.worker_name

    if multiprocess.enabled:
        multiprocess.set_process_identifier(state.worker_process_identifier)

Проблема наблюдаемости метрики

В базовом классе метрик есть метод, отвечающий за такое свойство, как «наблюдаемость». Пример кода из prometheus_client:

class MetricWrapperBase(Collector):

    def _is_observable(self):
        # Whether this metric is observable, i.e.
        # * a metric without label names and values, or
        # * the child of a labelled metric.
        return not self._labelnames or (
            self._labelnames and self._labelvalues
        )

Метрика считается наблюдаемой только в том случае, если у неё нет меток в принципе, либо им установили соответствующие значения. Если метрика не является наблюдаемой, то с ней запрещены операции, свойственные данному типу метрик.

Рассмотрим пример на счётчике успешных задач с метками worker и task. Перед тем как увеличивать счётчик, необходимо сделать метрику наблюдаемой. Это достигается проставлением их значений с помощью метода labels. В противном случае будет выброс исключения.

task_succeeded_total = Counter(
    name='celery_task_succeeded',
    documentation='Number of tasks successes.',
    labelnames=['worker', 'task'],
)

# OK
task_succeeded_total.labels(
    worker='image@myhost',
    task='convert',
).inc()

# raise ValueError(
#   'Counter metric is missing label values'
# )
task_succeeded_total.inc()

Когда при создании экземпляра метрики указывают список меток, полученный экземпляр считается родительским. В методе __init__ создаётся пустой словарь и помещается в поле _metrics . Во время вызова метода labels происходит поиск дочернего экземпляра по переданным значениям меток. При его отсутствии создаётся новый и сохраняется по соответствующему ключу. Именно дочерний экземпляр будет содержать в себе переданные значения меток. Тем самым он становится «наблюдаемым» и участвует в процессе сбора метрик.

Рисунок 9. Диаграмма объектов счётчика успешных задач
Рисунок 9. Диаграмма объектов счётчика успешных задач

Создание дочернего экземпляра происходит лениво во время вызова метода labels. Однако поведение меняется для экземпляров без меток: они будут сразу «наблюдаемыми», и можно применять свойственные им операции.

task_succeeded_total = Counter(
    name='celery_task_succeeded',
    documentation='Number of tasks successes.',
)

# OK
task_succeeded_total.inc()

До тех пор, пока нет нужды в режиме мультипроцессинга, всё будет работать штатно. Но когда мы включаем этот режим и решаем проблему роста файлов метрик, то сталкиваемся с ситуацией, при которой наблюдаемые метрики обходят логику идентификации воркеров. Создание конечного экземпляра происходит раньше, чем мы заменяем ValueClass . Таким образом, эти метрики работают с файлами, привязанными к PID, а не к worker_process_identifier.

Один из способов решить эту проблему — принудительно сделать такие метрики «ленивыми». Для этого потребуется создать новый класс, экземпляр которого будет содержать дополнительный признак _flag. Метод __init__ пропустит инициализацию метрики, а уже при первой операции с экземпляром будет вызов _metric_init.

from typing import Type, TypeVar

from prometheus_client import Counter, Gauge, Histogram, Summary
from prometheus_client.metrics import MetricWrapperBase

Metric = TypeVar('Metric', Counter, Gauge, Histogram, Summary)

def make_lazy_observable_class(base: Type[Metric]) -> Type[Metric]:
    """Create lazy observable class."""

    class LazyObservable(base):

        def __init__(self, *args, **kwargs):
            self._flag = False
            super().__init__(*args, **kwargs)

        def _is_observable(self):
            if not self._flag:
                return False

            return super()._is_observable()

        def _raise_if_not_observable(self):
            if not self._flag:
                self._metric_init()
                self._flag = True

            super()._raise_if_not_observable()

    return LazyObservable

task_succeeded_total = make_lazy_observable_class(Counter)(
    name='celery_task_succeeded',
    documentation='Number of tasks successes.',
)

# False
task_succeeded_total._is_observable()

# OK
task_succeeded_total.inc()

# True
task_succeeded_total._is_observable()

Эта утилита поможет однозначно сопоставить экземпляры таких метрик и файлы, в которые они будут сбрасываться на диск.

Проблема сброса датчиков

Существуют ситуации, при которых потребуется перезапуск основного процесса. Это может быть как намеренный перезапуск для обновления, так и автоматическое поднятие после внезапной смерти. Если метрики находятся в памяти, то проблем с согласованностью не будет. После старта всё начинается с чистого листа. Неочевидные проблемы возникают с датчиками в режиме мультипроцессинга. Они пишут свои значения в файлы, имена которых соответствуют шаблону:

gauge_{multiprocess_mode}_{process_identifier}.db

По умолчанию process_identifier равен PID. После перезапуска PID процесса будет отличаться, и возникает ситуация, при которой один показатель будет в двух и более файлах с разными значениями. Как уже упоминалось ранее, датчики не являются накопительной метрикой. Счётчики или гистограммы с этой проблемой справляются естественным образом: значения одинаковых метрик из разных файлов просто складываются. С датчиками ситуация другая: им нужно указывать алгоритм multiprocess_mode, по которому сборщик метрик будет разрешать коллизии. На текущий момент в prometheus_client существуют следующие алгоритмы:

  • all

  • min

  • max

  • sum

  • mostrecent

  • liveall

  • livemin

  • livemax

  • livesum

  • livemostrecent

По умолчанию используется алгоритм all, который для каждой метрики соответствующего файла добавляет метку pid. В ответе от ручки /metrics придёт столько метрик, сколько будет файлов на диске. Это будет уже не один показатель, а множество подобных показателей, разделённых метками процессов, которые когда-либо их собрали.

В целом из названий алгоритмов уже понятно, как они работают. Но следует обратить внимание на то, что среди них есть live-реализации. Они отличаются от обычных поведением с файлом на диске при завершении процесса. Live-алгоритмы должны удалять за собой файлы соответствующего процесса, но для этого перед завершением программы нужно помечать текущий процесс как удалённый.

# celery_control/receivers.py

from celery import signals
from celery.worker.consumer import Consumer
from prometheus_client.multiprocess import mark_process_dead

from celery_control import multiprocess
from celery_control.conf import state

@signals.worker_shutdown.connect
def receive_worker_shutdown(**kwargs):
    if not multiprocess.enabled:
        return

    mark_process_dead(state.worker_process_identifier)

@signals.worker_process_shutdown.connect
def receive_worker_process_shutdown(**kwargs):
    if not multiprocess.enabled:
        return

    mark_process_dead(state.worker_process_identifier)

Учитывая тонкости реализации перечисленных алгоритмов, можно понять, почему для датчиков текущих запросов выбран именно алгоритм livemostrecent. Если процесс внезапно завершит работу, не пройдя процедуру удаления файлов, то ручка /metrics всё равно будет отдавать только актуальные показатели, несмотря на то что рядом может лежать другой файл с такими же метриками.

В рамках задачи по идентификации обработчиков я подменяю process_identifier и в основном процессе тоже. Это позволит переиспользовать файлы метрик даже после внезапной смерти процесса. Однако такая нештатная ситуация всё равно может привести к неверным показаниям. Представим, что воркер получил задачу task_test_crash. Какое‑то время она ожидала свободного обработчика, успела попасть в метрику task_waiting, и сразу после этого произошёл сбой. После повторного запуска на диске будет лежать файл с последней, актуальной на тот момент, метрикой, а в состоянии Celery total_count эта задача отсутствует. Новая задача могла просто не поступать, а та, которая ожидала в прошлом, могла перекинуться на другой экземпляр воркера. Именно в этом уникальном случае рассинхронизируются датчики в памяти и на диске. Ручка /metrics будет возвращать информацию, что в данный момент задача task_test_crash в процессе ожидания, хотя на самом деле это не так. Решить эту проблему можно самостоятельным чтением файлов метрик и обновлением в памяти множества увиденных задач. Достаточно сделать это один раз в момент инициализации основного процесса.

# celery_control/receivers.py

from celery import signals
from celery.apps.worker import Worker

from celery_control import multiprocess
from celery_control.conf import state

@signals.worker_init.connect
def receive_worker_init(sender: Worker, **kwargs):
    ...

    if multiprocess.enabled:
        multiprocess.set_process_identifier(state.worker_process_identifier)

        # fill from multiprocess samples
        registry = CollectorRegistry()

        MultiProcessCollector(
            registry,
            path=multiprocess.prometheus_multiproc_path,
        )

        for metric in registry.collect():
            for sample in metric.samples:
                task = sample.labels.get('task')
                worker = sample.labels.get('worker')
                if task is None or worker != state.worker_name:
                    continue

                state.tasks_seen.add(task)

Такая синхронизация будет работать только для датчиков текущих задач. Если в приложении есть пользовательские датчики, то нужно быть осторожным. Выберите подходящий алгоритм разрешения коллизий и помните о потенциальных сбоях вашего приложения.

Проблема скорости диска

Существует рекомендация по уменьшению зависимости от диска: можно смонтировать директорию как tmpfs. С точки зрения приложения ничего не поменяется, однако файлы будут храниться в памяти. Думаю, это может заметно помочь даже с SSD. В таком случае схема воркера будет выглядеть следующим образом:

Рисунок 10. Схема применения tmpfs
Рисунок 10. Схема применения tmpfs

Если ваше приложение запущено в Kubernetes, то можно воспользоваться специальным типом volume‑а emptyDir, установив параметру emptyDir.medium значение Memory. Но будьте осторожны: размер этой директории учитывается в общем потреблении памяти. В более старых версиях Kubernetes ещё не было возможности указать лимит на размер этой директории. Поэтому, если ваше решение будет подразумевать бесконтрольный рост количества файлов, вы можете исчерпать выделенную поду память. Он упадёт в CrashLoop, и восстановить работоспособность сервиса вы сможете только после ручного удаления пода — простой перезапуск не поможет (emptyDir не очищается между перезапусками пода).

Также необходимо помнить, что бесконтрольный рост числа файлов метрик влияет на производительность /metrics. Последовательное чтение большого количества файлов с диска с последующей агрегацией может заметно увеличить длительность отдачи метрик, а также косвенно повлиять на общую производительность в моменте.

Healthcheck

Pull-модель мониторинга подразумевает автоматическую проверку жизнеспособности цели. Для каждого экземпляра генерируется метрика up, которая отражает его состояние. Благодаря этому можно отключить периодическую отправку heartbeat, если он не используются, конечно. Это снизит нагрузку на брокер. Однако в случае запуска нескольких воркеров, например, через multistart, метрика up будет отображать жизнеспособность только одного воркера из всех — того, в котором запущен поток сервера. Решить эту проблему можно введением собственной метрики worker_online.

# celery_control/metrics.py

worker_online = Gauge(
    name='celery_worker_online',
    documentation='Worker online status',
    labelnames=['worker'],
    multiprocess_mode='livemostrecent',
)

Для воркера, в котором запущен сервер, датчик будет обновляться в момент получения метрик, а конкурирующий воркер будет периодически обновлять значение в потоке демона и записывать его на диск.

# celery_control/tracker.py

def track():
    metrics.worker_online.labels(
        worker=state.worker_name,
    ).set_to_current_time()

    # ...fill tasks_seen
    # ...make snapshot and track metrics

В отличие от метрики up, в качестве значения будем проставлять текущее время. Это будет время последнего раза, когда воркер был online. Логику принятия решения, когда считать, что воркер теперь offline, будем настраивать в дашборде. Через Grafana можно указать пороги, при срабатывании которых будет соответствующее оповещение.

Есть несколько вариантов создания визуализаций на панели Grafana. Например, можно построить Bar gauge давности последнего выхода воркера на связь.

sort(celery_worker_online - time())
Рисунок 11. График доступности воркеров
Рисунок 11. График доступности воркеров

На графике представлен случай, когда в одном контейнере через multistart запущено два воркера. Второй периодически обновляет датчик celery_worker_online. И если он неожиданно прервёт свою работу, в файлах метрик останутся старые значения. После этого первый воркер станет читать эти файлы вместе со своими и вернёт последнее время жизни второго воркера. На панели мониторинга отобразится следующее:

Рисунок 12. График доступности воркеров
Рисунок 12. График доступности воркеров

Второй вариант создания визуализации — подсчёт общего количества доступных воркеров. Это позволяет отслеживать необходимый минимум для корректной работы, а в случае аварии — среагировать на инцидент.

count by (worker) (celery_worker_online)
Рисунок 13. График доступности воркеров
Рисунок 13. График доступности воркеров

Как видите, этот подход достаточно гибкий и позволяет отображать информацию о состоянии воркеров в различных вариантах. Комбинируя визуализации, можно достичь требуемого уровня мониторинга. А необходимые пороги срабатывания каждый определит для себя самостоятельно.

Метрика публикаций задач

Задачи могут публиковать другие задачи, а те в свою очередь — третьи, и так по цепочке. Но зачастую первоисточником является не сам воркер, а сопутствующие приложения рядом. В качестве таких приложений обычно выступает какой-то сервер, обрабатывающий клиентские запросы, и Celery beat, который публикует задачи в очередь согласно заданному расписанию. Поэтому пригодится метрика количества отправленных задач во всех перечисленных компонентах системы. О том, как настроить каждый из компонентов, расскажу ниже.

На основе полученного опыта можно утверждать, что не так важно, откуда именно была опубликована задача. Метку worker_name в данном случае можно опустить. Более того, благодаря pull-модели эта информация всё же будет присутствовать в других метках, например, instance или pod. При необходимости их можно вывести на своих графиках.

task_published_total = Counter(
    name='celery_task_published',
    documentation='Number of published tasks.',
    labelnames=['task'],
)

В Celery есть два сигнала, которые срабатывают при публикации задачи: before_task_publish и after_task_publish. Для нашей задачи будет достаточно обработать второй.

@signals.after_task_publish.connect
def receive_after_task_publish(sender: str, **kwargs):
    metrics.task_published_total.labels(
        task=sender,
    ).inc()

Настройка

Все примеры из этой статьи реализованы в библиотеке celery_control. Проект доступен в PyPI. Для установки можно воспользоваться пакетным менеджером.

pip install celery-control

Для настройки приложения доступны две функции: setup_worker и setup_publisher.

Настройка воркера

Функция setup_worker подключит все перечисленные сигналы, в том числе сигнал публикации задачи. На момент написания статьи функция предоставляет следующие аргументы:

  • registry

  • server_host

  • server_port

  • server_disable_compression

  • tracker_daemon_interval

  • tracker_application_callback

  • task_runtime_buckets

Аргумент registry позволит зарегистрировать метрики в пользовательском CollectorRegistry, если это необходимо. По умолчанию используется глобальный REGISTRY из prometheus_client.

Аргументы server_host и server_port отвечают за интерфейс сервера, на котором он будет обрабатывать запросы. Аргумент server_disable_compression позволит принудительно запретить сжимать метрики, даже если клиентский заголовок Accept-Encoding будет запрашивать обратное. Каждый параметр можно настроить через соответствующие переменные окружения без непосредственной передачи аргументов:

  • CELERY_CONTROL_SERVER_HOST. По умолчанию 0.0.0.0.

  • CELERY_CONTROL_SERVER_PORT. По умолчанию 5555.

  • CELERY_CONTROL_SERVER_DISABLE_COMPRESSION. По умолчанию False.

В случае неудачного запуска сервера по причине занятого порта, как в примере запуска multistart-ом, запустится фоновый поток демона, который с заданным интервалом tracker_daemon_interval будет записывать метрики текущего состояния Celery на диск. Соответственно, помимо ошибки занятости порта, для запуска этого потока требуется включённый режим мультипроцессинга: переменная окружения PROMETHEUS_MULTIPROC_DIR. Как и параметры сервера, параметр tracker_daemon_interval можно настроить через соответствующую переменную окружения CELERY_CONTROL_TRACKER_DAEMON_INTERVAL, значение которой по умолчанию равно 15.

Воркеру может потребоваться отслеживать свои датчики, например, состояние пула подключений к БД или потребление tmpfs-раздела. Для этого достаточно передать пользовательскую функцию в параметр tracker_application_callback. Она будет вызываться перед формированием ответа метрик в том же месте, где отслеживаются показатели текущих задач Celery. В случае запуска фонового потока демона функция будет вызываться уже в нём.

В зависимости от специфики задач, может потребоваться изменить бакеты для метрики task_runtime. Для этого есть специальный параметр task_runtime_buckets. По умолчанию будет использоваться Histogram.DEFAULT_BUCKETS.

from celery import Celery
from celery_conrol import setup_worker

app = Celery(broker='amqp://guest:guest@localhost:5672')

setup_worker()

Настройка publisher

Функция setup_publisher подключит только сигнал публикации задач. На момент написания статьи функция предоставляет следующие аргументы:

  • registry

  • start_wsgi_server

  • server_host

  • server_port

  • server_disable_compression

  • tracker_application_callback

Эту функцию можно применять как на стороне сервера, так и на стороне Celery beat. Основное отличие будет лишь в необходимости запуска дополнительного потока сервера для отдачи метрик. За это отвечает параметр start_wsgi_server. Очевидно, что на стороне сервера в этом нет необходимости, так как он сам собирает и отдаёт метрики, а вот на стороне Celery beat без этого не обойтись. Если в вашем наборе приложений есть ещё какие-то компоненты, которые публикуют задачи, то можно также рассмотреть применение в них setup_publisher.

from celery_conrol import setup_publisher

setup_publisher(start_wsgi_server=True)

Важное примечание: функция tracker_application_callback будет вызываться только при наличии запущенного дополнительного потока сервера (start_wsgi_server=True). Другими словами, если вы настраиваете метрики на стороне сервера приложения, то вам необходимо самостоятельно обеспечить вызов необходимой функции, например, в уже существующей у вас ручке /metrics.

Пример Django-приложения

Полный код примера можно посмотреть на платформе GitVerse.

Часто настройка усложняется, когда в классическом наборе приложений Django + Celery + Beat используется единая кодовая база. Воркерам требуется подключить абсолютно все сигналы, Django-серверу нужны только сигналы публикаций, а Beat-у, помимо них, необходимо запустить ещё и сервер для отдачи метрик. Поэтому кодовую базу следует организовать таким образом, чтобы для разных служб была разная конфигурация настройки Celery. На примере макета из документации, добиться этого можно, создав отдельные файлы для каждой службы.

example/
    proj/__init__.py
        /beat.py
        /celery.py
        /metrics.py
        /settings.py
        /tasks.py
        /urls.py
        /views.py
        /worker.py
        /wsgi.py
    manage.py

Файл tasks.py формально останется без изменений. В нём будут определены задачи и реализована их бизнес-логика. В celery.py произойдут минимальные изменения: по-прежнему будет создание экземпляра Celery, но вместо его непосредственной настройки будет конфигурация через настройки Django.

# proj/celery.py

import os

from celery import Celery

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

app = Celery()

app.config_from_object('django.conf:settings', namespace='CELERY')

Для предварительной настройки воркера создадим отдельный файл worker.py. В нём организуем инициализацию Django, регистрацию задач и саму настройку метрик.

# proj/worker.py

from celery_control import setup_worker
from proj.celery import app

# Load task modules from all registered Django apps.
app.autodiscover_tasks()

setup_worker()

Для запуска воркера воспользуемся опцией --app в формате module.path:attribute.

PROMETHEUS_MULTIPROC_DIR=/tmp/prometheus_multiproc \
CELERY_CONTROL_SERVER_PORT=5555 \
celery -A proj.worker:app worker \
--pool prefork \
-c 2 \
-l INFO \
-n "image@myhost"

Подобным способом настроим Celery beat через отдельный файл beat.py. Напомню, что для отдачи метрик потребуется сервер в отдельном потоке.

# proj/beat.py

from datetime import timedelta

from celery_control import setup_publisher
from proj.celery import app

app.conf.beat_schedule = {
    "test": {
        "task": 'proj.tasks.test_succeeded',
        'schedule': timedelta(seconds=5),
    }
}

# register publisher metrics
# start wsgi server
setup_publisher(start_wsgi_server=True)
CELERY_CONTROL_SERVER_PORT=5556 \
celery -A proj.beat:app beat

Настроить сервер можно в wsgi.py. Ввиду того, что сервер сам будет собирать и отдавать метрики, в запуске дополнительного сервера в отдельном потоке нет необходимости. Следовательно, опцию start_wsgi_server можно опустить.

# proj/wsgi.py

import os

from django.core.wsgi import get_wsgi_application

from celery_control import setup_publisher

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')

application = get_wsgi_application()

setup_publisher()

Вам может потребоваться отслеживать собственные датчики перед формированием ответа метрик. К примеру, вы решили смонтировать директорию /tmp как раздел tmpfs, и хотите силами приложения отслеживать его потребление. Тогда вам потребуется следующая функция:

# proj/metrics
import shutil

from prometheus_client import Gauge

disk_usage_total = Gauge(
    name='disk_usage_total',
    documentation='Total in bytes',
    labelnames=('path',),
    multiprocess_mode='livemostrecent',
)
disk_usage_used = Gauge(
    name='disk_usage_used',
    documentation='Used in bytes',
    labelnames=('path',),
    multiprocess_mode='livemostrecent',
)
disk_usage_free = Gauge(
    name='disk_usage_free',
    documentation='Free in bytes',
    labelnames=('path',),
    multiprocess_mode='livemostrecent',
)

def track():
    path = '/tmp'
    disk_usage = shutil.disk_usage(path)
    disk_usage_total.labels(path).set(disk_usage.total)
    disk_usage_used.labels(path).set(disk_usage.used)
    disk_usage_free.labels(path).set(disk_usage.free)

Эту функцию можно передать в качестве аргумента при настройке воркера.

# proj/worker.py

from celery_control import setup_worker
from proj.celery import app
from proj.metrics import track

app.autodiscover_tasks()

setup_worker(
    tracker_application_callback=track,
)

Теперь, когда воркер будет формировать ответ по метрикам, показатели потребления раздела также будут отслеживаться и входить в состав ответа. Тот же аргумент можно использовать при настройке Celery beat. Но, как упоминалось ранее, на стороне сервера этот приём необходимо реализовать самостоятельно.

# proj/views.py

import os

import prometheus_client
from django.http import HttpResponse
from prometheus_client import multiprocess

from proj.metrics import track

def metrics(_):
    track()     # track application gauges here

    if "PROMETHEUS_MULTIPROC_DIR" in os.environ:
        registry = prometheus_client.CollectorRegistry()
        multiprocess.MultiProcessCollector(registry)
    else:
        registry = prometheus_client.REGISTRY

    return HttpResponse(
        content=prometheus_client.generate_latest(registry),
        content_type=prometheus_client.CONTENT_TYPE_LATEST,
    )

Если представить, что все компоненты запускаются в отдельных контейнерах, одна из возможных конфигураций на общей схеме может принять следующий вид:

Рисунок 14. Схема проекта
Рисунок 14. Схема проекта

Сравнение функциональности

Для простоты изложения введу новые временные термины: подход к мониторингу на основе отправки событий — event-based, а подход, описанный в этой статье, — pull-based. Сравнивать буду не только подходы, но и реализации. В частности, буду смотреть на классического представителя в виде Flower и на собственную реализацию celery-exporter из предыдущей статьи.

Счётчики задач

Flower

Счётчик flower_events_total. Доступны метки task и worker и type. Информация об ошибках и о причинах повторных попыток отсутствует.

Celery exporter

Набор счётчиков под каждый тип события с метками task и worker. В счётчике ошибок появляется метка exception.

Celery control

Аналогичный набор счётчиков под каждый тип события. Метка exception доступна не только в счётчике ошибок, но и в счётчике повторных попыток. В метрике отклонённых задач доступна метка requeue, в метрике отозванных — expired и terminated.

Архитектура Event-based подвержена классическим «болячкам» вне зависимости от реализации. Например, вполне вероятно получить None в метке task. Это вызвано тем, что в State не оказалось сохранённого имени задачи. Pull-based лишена таких проблем.

В celery_control появились новые счётчики:

  • celery_task_unknown_total: количество неизвестных задач

  • celery_message_rejected_total: количество сообщений неизвестного типа

Вне зависимости от архитектуры интересно ведёт себя метрика отозванных задач. Если задачи отзываются через remote control, то, скорее всего, значение счётчика будет завышено. Это связано с тем, что событие task-revoked будет отправляться каждым воркером. На каждое такое событие будет увеличен соответствующий счётчик. Таким образом, придётся лишь догадываться, сколько задач на самом деле было отозвано. Однако в celery_control можно хотя бы выделить из общего счётчика те, которые отозвались по времени истечения expired. Для них значения будут честные.

Датчики задач

У Flower есть следующий набор датчиков:

  • flower_task_prefetch_time_seconds

  • flower_worker_prefetched_tasks

  • flower_worker_number_of_currently_executing_tasks

Показатель flower_task_prefetch_time_seconds сообщает, как быстро задача передаётся в обработку с момента её получения от брокера. Как именно он формируется, можно посмотреть в классе EventsState. Ниже фрагмент из оригинальных исходников Flower:

class EventsState(State):
    ...

    def event(self, event):
        ...

        if event_type.startswith('task-'):
            task_id = event['uuid']
            task = self.tasks.get(task_id)
            ...

            task_started = task.started
            task_received = task.received
            ...	

            if event_type == 'task-received' and not task.eta and task_received:
                self.metrics.number_of_prefetched_tasks.labels(
                    worker_name,
                    task_name,
                ).inc()

            if event_type == 'task-started' and not task.eta and task_started and task_received:
                self.metrics.prefetch_time.labels(
                    worker_name,
                    task_name,
                ).set(task_started - task_received)
                self.metrics.number_of_prefetched_tasks.labels(
                    worker_name,
                    task_name,
                ).dec()

            if event_type in ['task-succeeded', 'task-failed'] and not task.eta and task_started and task_received:
                self.metrics.prefetch_time.labels(
                    worker_name,
                    task_name,
                ).set(0)

Этот показатель вычисляется при получении события task-started на основании времени получения задачи воркером task_received и времени передачи её в работу task_started. Обратите внимание на проверку наличия этих переменных. Если Flower не обработал событие task-received, то атрибут received будет пустой. Ровно по этой же причине в метрике flower_events_total появляется None в метке task.

Датчик flower_worker_prefetched_tasks, как вы уже догадались, отвечает за количество задач, ожидающих свою очередь. Причём этот показатель, как и предыдущий, не распространяется на запланированные задачи (task.eta). Это может немного сбить с толку.

Основная слабость этих показателей состоит в том, что они чувствительны к внезапной смерти воркера. Если такое произойдёт в момент между получением задачи и непосредственной передачей её в работу, то сигнал task-started не отправится. В этом случае датчик flower_worker_prefetched_tasks не будет уменьшен. Таким образом, значения на графиках будут выше, чем есть на самом деле.

Похожая ситуация с flower_task_prefetch_time_seconds. Если смерть воркера наступит в момент обработки задачи, то ни task-succeeded, ни task-failed не будет отправлен. Следовательно, этот показатель не сбросится и будет отображать сохранённое значение либо до следующей обработки той же задачи тем же воркером, либо до перезапуска самого Flower. Как говорится, семь бед — один reset!

Датчик flower_worker_number_of_currently_executing_tasks устроен проще. Он берёт значение поля active из события worker-heartbeat.

class EventsState(State):
    ...

    def event(self, event):
        ...

        if event_type == 'worker-heartbeat':
            ...

            num_executing_tasks = event.get('active')
            if num_executing_tasks is not None:
                self.metrics.worker_number_of_currently_executing_tasks.labels(
                    worker_name,
                ).set(num_executing_tasks)

Нетрудно догадаться, что если эти события будут отключены, то метрика останется пустой. Но проблема ещё в другом: когда вы будете выкатывать новую версию приложения, наверняка будут новые worker_name. Это произойдёт, когда вы запускаете приложение в K8s и в аргументе имени воркера используете имя хоста %%h.

celery -A proj.worker:app worker -n image@%%h

У новых экземпляров будут новые имена подов, которые транслируются в имя хоста и, соответственно, содержатся в worker_name. Другими словами, все накопленные метрики прошлых воркеров будут вечно храниться в памяти процесса. Итак, мы плавно подходим к проблеме чистки старых метрик.

Чистка старых метрик

Flower

Логика очистки старых метрик отсутствует. Метрики воркеров, которые давно уже завершили свою работу, будут вечно храниться в памяти процесса flower. Классическая проблема push-модели.

Celery exporter

В собственном State хранится словарь известных воркеров. Фоновым потоком периодически проверяется время последнего heartbeat. Если воркер долго не выходил на связь, информация о данном воркере удаляется вместе с его метриками.

Celery control

Нет необходимости в реализации. Метрики прекращают серию автоматически благодаря pull-модели.

У flower чувствительность к внезапной смерти воркера усугубляется проблемой старых метрик. Это выражается либо в наличии ненужных метрик на графиках, либо в их неправильном значении. В celery_exporter эти проблемы решили фоновой логикой проверки последнего времени выхода воркера на связь. Правда, она требует обязательного включения heartbeat.

Если присмотреться, то flower, сelery_exporter и иные реализации на основе событий представляют собой своего рода Pushgateway. И, по настоятельным рекомендациям, их нужно использовать в ограниченных случаях. Считается, что единственно допустимым вариантом использования является сбор метрик заданий на уровне сервиса, и недопустимо включать в них метки, которые могли бы отличить конкретный экземпляр или машину.

Such a job's metrics should not include a machine or instance label to decouple the lifecycle of specific machines or instances from the pushed metrics.

Поэтому следует однозначно понимать, в каких случаях можно использовать тот или иной инструмент. С этой точки зрения flower подходит только тогда, когда все воркеры имеют статичные имена и не зависят от имени хоста. В противовес ему, celery_exporter можно использовать в окружениях с динамическим именем хоста, но при условии включённого heartbeat у воркеров. А реализацию pull-модели в лице celery_control можно использовать при любых обстоятельствах.

Счётчик публикаций

В Сelery доступна опция task_send_sent_event. С её помощью можно включить отправку события task-sent. Flower реагирует на неё так же, как и на другие события: увеличивает счётчик flower_events_total. Проблема в том, что имя хоста в теле события может принимать сгенерированное значение, например,gen7460@myshost. Следовательно, со временем разнообразие таких меток будет расти. В celery_exporter эту проблему решили подстановкой шаблонного значения, например, «generic». А в celery_control метка worker аналогичного счётчика не используется в принципе. Благодаря pull-модели будет и так понятно, из какого экземпляра получили ту или иную метрику.

Ещё немного о преимуществах

В предыдущей статье обсуждалось, что делать, когда у вас несколько проектов. Было установлено, что для мониторинга разных проектов потребуются разные экземпляры flower. А если появится желание сделать централизованный мониторинг, то придётся написать собственную обработку событий. В pull-модели всё решается конфигурированием скрапинга: каким целям какие метки проставлять. Например, в случае запуска приложения в K8s метрики необходимого проекта можно будет отфильтровать либо по namepsace, либо по имени deployment.

Значения датчиков перестали быть чувствительными к внезапной смерти воркера. В отличие от модели even-based значения вычисляются на основе внутреннего состояния Celery. То есть они берутся «как есть», вместо арифметических операций, как в случае с датчиком flower_worker_prefetched_tasks. Благодаря этому стало возможно получить метрику запланированных задач scheduled. Такую информацию из событий никогда не получишь. В celery_exporter пытались в отдельном потоке получать статистику по воркерам через remote control, но если он отключён, то никаких метрик не будет.

Единственный случай, когда в архитектуре pull-based значения метрик не будут соответствовать действительности, это когда конкурирующий воркер в режиме мультипроцессинга внезапно завершил работу и не смог самостоятельно подняться. В этой ситуации воркер, у которого в отдельном потоке запущен сервер, будет читать файлы конкурирующего воркера со старыми значениями. Установить это поможет датчик celery_worker_online. Благодаря ему станет понятно, что конкурирующий процесс не может стабильно работать. После устранения неполадки значения метрик по этому воркеру стабилизируются.

И напоследок самая «вкусная», на мой взгляд, фича. Pull-модель позволяет получить метрики не только задач, но и всего приложения. Вы сможете получить метрики по работе с БД, статистику попадания в кеш, показатели обращений к внешним сервисам и много другое. Иными словами, появляется возможность заглянуть внутрь воркера и подробнее наблюдать за его работой.

Сравнение производительности

Минимально сравню производительность воркера в разных режимах. Задача этого тестирования — понять, как влияет отправка событий, а также какова разница в скорости обработки при включении celery_control.

# proj/tasks.py

import logging
import statistics
import time

from proj.celery import app

logger = logging.getLogger('celery')

start, results = [time.monotonic()],  []

@app.task
def test_performance(test_idx: int, test_total: int, task_idx: int, task_total: int):
    if task_idx == 0:
        start[0] = time.monotonic()
    elif task_idx == task_total - 1:
        end = time.monotonic()
        duration = (end - start[0])
        tps = task_total / duration
        logger.error('[test_performance][%02d] tps: %s', test_idx, tps)
        results.append(tps)

        if test_idx == test_total - 1:
            results_max = max(results)
            results_median = statistics.median(results)
            logger.error(
                '[test_performance] median: %s; max: %s;',
                results_median,
                results_max,
            )
            results.clear()

Все воркеры запускались с уровнем журналирования ERROR, чтобы исключить из замеров затраты на журналирование. Были отключены gossip, mingle и heartbeat. Параметр prefetch-multiplier установлен в значение 1 тыс. Параметр concurrency равен 1. Тестирование проводилось локально на MacBook M1 Pro.

Тест 1

Публикация в очередь 100 порций задач по 1 тыс. в каждой. После завершения публикации запускаю воркер в требуемом режиме. Общее количество обработанных задач — 100 тыс. Результатом тестирования будет медианное время обработки каждой порции задач.

Идея теста — имитация постоянной нагрузки, когда в очереди всегда есть задачи.

from proj import tasks

test_total = 100

for test_idx in range(test_total):
    print(f'[{test_idx:02d}] start test performance publishing')
    task_total = 1000
    for task_idx in range(task_total):
        tasks.test_performance.delay(
            test_idx=test_idx,
            test_total=test_total,
            task_idx=task_idx,
            task_total=task_total,
        )

Тест 2

Меняю количество порций в предыдущем примере со 100 на 10. Запускаю публикацию задач, а после ее завершения запускаю воркер в требуемом режиме. После обработки фиксирую максимальное время и останавливаю воркер. Повторяю процедуру публикации, запуска воркера с последующей остановкой десять раз. Суммарное количество задач, обработанное воркером, — тоже 100 тыс. Результатом тестирования будет медианное значение максимального времени обработки.

Идея теста в имитации временной нагрузки, когда в очереди немного задач: их количество меньше или равно параметру prefetch-multiplier. В нашем случае эта ситуация будет часто воспроизводиться во время обработки последней порции задач. Когда предыдущие 9 тыс. задач уже обработаны, воркер заберёт последние 1 тыс. задач из очереди и будет меньше взаимодействовать с брокером. Из личных наблюдений: были случаи, когда максимальный результат достигался в первой порции задач. Именно поэтому для чистоты эксперимента после каждого теста останавливаю воркер.

Результаты для пула prefork

Рисунок 15. Диаграмма производительности для пула prefork
Рисунок 15. Диаграмма производительности для пула prefork

При включении отправки событий количество обрабатываемых задач в секунду снизилось примерно на 40% — c 2097 до 1252 TPS. Тест 2 показал ещё большую разницу в производительности — 46%. В свою очередь, использование celery_control снизило производительность всего лишь на 6% в Тесте 1 и на 11% — в Тесте 2.

Результаты для пула threads

Рисунок 16. Диаграмма производительности для пула threads
Рисунок 16. Диаграмма производительности для пула threads

Зависимость скорости обработки такая же, как в результатах для пула prefork. Но по сравнению с ними в базовых настройках абсолютные значения выросли примерно на 15%. По всей видимости, это была цена за передачу задач от главного процесса дочернему. Похожая корреляция и в результатах с включённым celery_control, чего не скажешь про включённые события. В Тесте 1 результаты с включёнными событиями показали прирост производительности более 60% по сравнению с тем же тестом для пула prefork. И хоть разница по отношению к базовым настройкам составляет всего 16%, в Тесте 2 она сильно увеличивается.

Другими словами, результаты тестирования показали, что включение отправки событий накладывает ощутимые дополнительные расходы, в то время как стоимость `celery_control` составляет от 6 до 11%.

Заключение

Представленная архитектура мониторинга Celery имеет множество преимуществ над общепринятой. Зная внутреннее устройство и описанные принципы работы, можно построить мониторинг в самых разных конфигурациях: от самых простых, когда все метрики хранятся в памяти, до самых сложных, когда метрики хранятся на диске вместе с файлами конкурирующих воркеров. Также не составит труда покрыть метриками службы, которые отправляют задачи в очередь.

Если этот подход кому-то откликнется, буду признателен за обратную связь об опыте его применения в ваших проектах.

Спасибо за внимание!

Полезные материалы

Комментарии (0)