Наш путь активной работы с очередями RabbitMQ начался с классического Celery. Осознав критичность низкоуровневого контроля системы, принялись работать с aio-pika. Но и этот уровень слишком местами сложный (далее расскажу почему), и нашли отличное решение, на текущий момент, в лице FastStream. Сразу оставлю такую пометку, что каждый инструмент подходит для решения своей задачи. Мы больше хотели сделать акцент на удобство и скорость разработки относительно затрачиваемого времени на миграции решений.

N.B.: Код возможно покажется неоптимальным или старым. Это всё наш дорогой Легаси.

Постановка задачи

Наша система построена на основе микросервисов, работающих с RabbitMQ. Внутри - обычный асинхронный код для похода на внешние API и в БД.

Требования:

  • Надежный консьюминг - это для нас критично, чтобы сообщение шло по всему флоу и нигде не останавливалось без причин. Если ошибка падает, то это должно отражаться в 3 местах: БД, логи и метрики.

  • Ретраи при ошибках обработки.

  • Трейсинг - поддержка OpenTelemetry.

  • Мониторинг сервиса через healthcheck’и.

  • Prometheus метрики.


Решение №1: Celery как консьюмер

Почему Celery

Celery — классический инструмент для фоновых задач, знакомый большинству Python-разработчиков. Из коробки: декларативное описание задач, ретраи с экспоненциальной задержкой, хранение результатов, мониторинг через Flower, интеграции с фреймворками. Логика проста: пишешь @app.task, запускаешь воркер — и сообщения из очереди начинают обрабатываться.

Как мы его использовали

Мы не отправляли задачи из кода в духе my_task.delay(), а настраивали Celery на прослушивание внешней очереди, куда сообщения попадали от других систем. По сути, Celery выступал как consumer: подключался к брокеру, забирал сообщения, десериализовал и передавал в наши обработчики. Настройки вроде max_retries, default_retry_delay, countdown позволяли гибко управлять поведением при сбоях. Важно ещё подсветить, что результат всегда игнорируется с помощью параметра ignore_result=True поскольку все результаты записываются в БД.

Пример инициализации воркера:

def create_app(
    name,
    broker,
    include,
    backend=None,
    task_queues=None,
    liveness_probe=1,
    update_period=60,
    watcher_config={},
):
    # Создание само приложение + наложение дополнительных конфигурации
    app = Celery(name, broker=broker, include=include, backend=backend)
    app.conf.update(
        result_expires=120,
    )
    add_without_heartbeat_argument = Option(
        ("--without-heartbeat",),
        default=True,
    )

    app.user_options["worker"].add(add_without_heartbeat_argument)
    if task_queues is not None:
        app.conf.task_queues = task_queues

    if liveness_probe:
        add_update_period_argument = Option(
            ("--update-period",),
            default=update_period,
        )

        HEARTBEAT_FILE.touch()

        app.user_options["worker"].add(add_update_period_argument)

        # Добавление кастомной livenessProbe для K8s
        app.steps["worker"].add(LivenessProbe)
    
    # инициализация трейсинга
    with_tracing = watcher_config.get("with_tracing")
    if with_tracing:
        tracing_exporters = watcher_config.get("tracing_exporters", ())
        signals.worker_process_init.connect(
            init_celery_tracing(app_name=name, tracing_exporters=tracing_exporters),
            weak=False,
        )
    return app

С чем столкнулись

Celery проектировался как система передачи сообщений между системами. Из-за чего столкнулись со следующими проблемами:

  • Потребление памяти — из-за оборачивания каждого сообщения в метаданные и хранения внутренних структур Celery расход оперативной памяти быстро рос пропорционально потоку сообщений. При высоком темпе обработки воркер начинал использовать значительно больше ОЗУ, чем требовалось самой бизнес-логике, что вынуждало выделять избыточные ресурсы.

  • Управление соединениями и heartbeat — Celery скрывает многие детали брокера, из‑за чего при сетевых сбоях восстановление происходило с задержками, а тонкая настройка consumer_timeout, broker_transport_options была сложной и плохо документированной.

  • Избыточность — Как можно заметить мы используем минимальные дополнительные конфигурации Celery.

  • Падение контейнера на битом сообщений — Будет не совсем справедливо относить это полностью к минусам самого Celery, это больше камень в наш огород. Но опыт есть опыт. Если воркер обрабатывал невалидное сообщение в плане структуры, то падал весь контейнер, при этом сообщение консьюмилось. Что приводило к непониманию “куда делось сообщение и что с ним произошло?”.

Стало ясно: Celery — отличный выбор для фоновых задач с результатом, но для непрерывного потокового консьюминга он перегружен и недостаточно прозрачен.


Решение №2: aio-pika — близко к железу

Идея

Перейти на чистый AMQP, отказавшись от посредника в виде Celery. aio-pika — асинхронная библиотека для RabbitMQ, предоставляющая прямой доступ к каналам, обменникам, очередям. Код становится минимальной прослойкой над протоколом: сами управляем подписками, подтверждениями (ack/nack), префетчем, реконнектами.

Что переписали

Написали небольшой собственный “фреймворк” консьюминга: асинхронный раннер, который при запуске создаёт соединение, открывает канал, объявляет очереди с нужными параметрами durable/exclusive, подписывается на них, а в колбэке вызывает наши обработчики.

Поверх этого появились:

  • Ручной retry — Декоратор поверх колбэка, имеющий свой счётчик кол-во повторов при возникновений ошибок.

  • LivenessProb — Кастомный на уровне ядра aio-pika представлен чуть ниже.

  • Трейсинг — вручную оборачиваем вызовы в OpenTelemetry спаны, передаём trace context в заголовках AMQP при ретраях.

Пример LivenessProb:

class CustomRobustConnection(RobustConnection):
    def __init__(
        self,
        url: URL,
        loop: asyncio.AbstractEventLoop | None = None,
        **kwargs: Any,
    ):
        super().__init__(url=url, loop=loop, **kwargs)
        # Кастомный класс пробы, схож с тем, что выше для Celery
        self._liveness_probe = LivenessProbe()

    async def connect(self, timeout: TimeoutType = None) -> None:
        # Тут держитесь крепче. Я когда впервые это увидел, без литра кофе не смог понять как оно работает
        self._RobustConnection__connect_timeout = timeout

        if self.is_closed:
            raise RuntimeError(f"{self!r} connection closed")

        if self.reconnecting:
            raise RuntimeError(
                (
                    "Connect method called but connection "
                    f"{self!r} is reconnecting right now."
                ),
                self,
            )

        if not self._RobustConnection__reconnection_task:
            self._RobustConnection__reconnection_task = self.loop.create_task(
                self.__connection_factory(),
            )

        await self._RobustConnection__fail_fast_future
        await self.connected.wait()

    async def __connection_factory(self) -> None:
        logger.debug("Starting connection factory for %r", self)
        while not self.is_closed and not self._close_called:
            logger.debug("Waiting for connection close event for %r", self)
            await self._RobustConnection__connection_close_event.wait()

            if self.is_closed or self._close_called:
                return

            try:
                self.transport = None
                self.connected.clear()

                logger.debug("Connection attempt for %r", self)
                await Connection.connect(self, self._RobustConnection__connect_timeout)

                if not self._RobustConnection__fail_fast_future.done():
                    self._RobustConnection__fail_fast_future.set_result(None)

                logger.debug("Connection made on %r", self)
                self._liveness_probe.start()

            except CONNECTION_EXCEPTIONS as e:
                if not self._RobustConnection__fail_fast_future.done():
                    self._RobustConnection__fail_fast_future.set_exception(e)
                    return

                logger.warning(
                    'Connection attempt to "%s" failed: %s. '
                    "Reconnecting after %r seconds.",
                    self,
                    e,
                    self.reconnect_interval,
                )
                self._liveness_probe.stop()
            except Exception:
                logger.exception(
                    "Reconnect attempt failed %s. " "Retrying after %r seconds.",
                    self,
                    self.reconnect_interval,
                )
                self._liveness_probe.stop()

            await asyncio.sleep(self.reconnect_interval)

Что стало лучше

Полный контроль над жизненным циклом соединения, тонкая настройка prefetch, возможность реализовать любую логику подтверждения (например, отложенный ack после завершения цепочки действий). Нет лишних метаданных в теле сообщения — брокер передаёт ровно то, что отправил продюсер. Асинхронность нативная, работает на asyncio без костылей.

Проблемы

Каждая «плюшка» делалась вручную и со временем объём инфраструктурного кода разросся. Десятки строк для декларации очередей, логирование reconnect‑цикла, согласование формата trace‑заголовков между сервисами. Healthcheck, хоть и был создан вручную, требовал аккуратности: нужно было отслеживать состояние не только TCP-соединения, но и открытого канала.


Решение №3: FastStream — золотая середина

Основная идея

FastStream — надстройка над aio-pika (а также над NATS, Kafka), которая даёт декларативный стиль описания consumer’ов, lifespan‑хуки, встроенные механизмы: healthcheck-эндпоинт, OpenTelemetry-интеграция, метрики Prometheus.

По сути, это aio-pika, обёрнутая в лучшие практики, которые мы сами реализовывали руками в предыдущем решении. Механизм retry описан как пример использования middleware.

Что пошло так

  • Healthcheck из коробки — достаточно указать FastStream объект в ASGI-приложении (через AsgiFastStream), и на /health возвращается статус брокера.

  • Lifespan — менеджер контекста управляет запуском и корректной остановкой consumer’ов, повторными соединениями. Не нужно писать свои обработчики сигналов. Это касалось не только RMQ коннектов, но и например коннектов к базам данных.

  • Мониторинг и трейсинг — подключение OpenTelemetry сводится к нескольким строчкам: спаны автоматически создаются для каждого обработанного сообщения, propagate context через заголовки.

  • Декларативные middleware — проще внедрять кросс‑касательную логику (логирование, валидацию) без захламления бизнес-кода.

  • Простота конфигурации — брокер, очереди, обменники описываются через Python-декораторы и типы, нет нужды вручную управлять каналами и ack/nack.

  • Интеграция с Pydantic — Сообщение на уровне описание консьюмеров сразу пытается отвалидироваться в Pydantic модель, если это нужно.

  • Dependency Injection — Как можно увидеть в примере ниже FastStream предлагает нам возможность подключение DI как в FastAPI.

Вот во что превратилась кодовая база одной инициализации процесса:

app = AsgiFastStream(
    broker,
    # health + metrics из коробки
    asgi_routes=[
        (
            "/health",
            make_system_ping_asgi(broker, timeout=5.0, include_in_schema=False),
        ),
        ("/metrics", make_asgi_app(registry)),
    ],
    # Кастомный lifespan. Обычно здесь идёт инициализации подключений к БД
    lifespan=lifespan,
)

Было бы ещё не очень справедливо не показать, что из себя представляет broker

broker = RabbitBroker(
    settings.RMQ_URL,
    # Примеры миддлвары
    # RabbitPrometheusMiddleware из коробки для мониторинга сообщений
    # EnrichLogMiddleware и FailCatchComplexMiddleware кастомные для отслеживания логирования и ошибок 
    middlewares=(
        RabbitPrometheusMiddleware(registry=registry),
        EnrichLogMiddleware,
        FailCatchComplexMiddleware(
            ignore_routing_keys=[
                settings.RMQ_FAIL_TABLE_QUEUE,
                settings.RMQ_DASHBOARD_SETTINGS_QUEUE,
                settings.RMQ_TIMEOUT_QUEUE,
            ],
        ),
    ),
    logger=logger,
    # Кастомные парсеры и декодеры для обработки входящих сообщений
    parser=json_parser,
    decoder=decoder,
)

broker.include_router(router)

и пример как инициализируется консьюмер

from faststream import Depends
from faststream.rabbit import (
    ExchangeType,
    RabbitExchange,
    RabbitMessage,
    RabbitQueue,
    RabbitRouter,
)
from faststream.rabbit.annotations import RabbitBroker as ContextRabbitBroker

# Вместо RabbitRouter можно использовать broker.
router = RabbitRouter()
@router.subscriber(
    RabbitQueue(
        name=...,
        durable=True,
        routing_key=...,
    ),
    RabbitExchange(
        name=...,
        durable=True,
    ),
)
async def on_service_hub_message(
    message: RabbitMessage,
    # Имеется общий контекст всей системы
    broker: ContextRabbitBroker,
    # DI
    async_session=Depends(get_db_session),
) -> None:

Подключение консьюмеров простое, почти схожее с Celery, но чуть шире. Возможности FastStream на этом не заканчиваются. Если копать ещё глубже, то там можно найти документацию AsyncAPI и In-memory тесты.

Что пошло не так

  • Производительность — из-за дополнительных слоёв абстракции (middleware, автоматическая обвязка спанов, встроенные ретраи) пропускная способность ниже, чем у голого aio-pika. Это для нас лишь только в теории, поскольку поток не достигает больших значений 30-40 RPS. Справедливости ради, это не относится к минусам, поскольку бенчмарки я не проводил, и фраза “поверь мне брат” меня убедила.

  • Сложность отслеживания кодовой базы — Тут опять из-за дополнительных слоёв абстракции иногда заходишь внутрь посмотреть что там, и можно с лёгкостью потеряться.

Итоговое сравнение

Критерий

Celery

aio-pika (самописный)

FastStream

Подход

Динамические задачи

Низкоуровневый AMQP

Высокоуровневый consumer

Асинхронность

Ограниченная (gevent)

Полная (asyncio)

Полная (asyncio)

Healthcheck

Требует доп. решения

Ручная реализация

Из коробки

Retry

Встроен в задачу

Реализуется вручную

Реализуется вручную (есть пример)

Трейсинг (OTel)

Через сигналы Celery

Ручное встраивание

Из коробки

Контроль

Низкий

Максимальный

Средний (middleware)

Производительность

Умеренная

Высокая

Больше чем у Celery, ниже чем у aio-pika

Кривая входа

Низкая

Средняя (требует знаний AMQP)

Низкая (знакомо по FastAPI)


Заключение

Пройдя путь от монолитного Celery через хрупкий самописный код до сбалансированного FastStream, мы пришли к инструменту, который закрывает все эксплуатационные потребности: healthcheck, трейсинг, метрики — и при этом не требует поддерживать сотни строк инфраструктурного кода.

Да, возможно я выступаю в качестве адвоката этого инструмента, но что могу поделать, когда он так понравился? это вы ещё не увидели как стараюсь переписать всё на Rust.

Отойдя от жаргона и шуток повторю первую свою мысль: У каждой задачи свой инструмент. Мой — это FastStream.

Комментарии (0)