
Привет! Меня зовут Никита Титков, я бэкенд-разработчик в Банки.ру.
Создать простую очередь – несложная задача. Но как только очередей становится десятки и через них идут важные для бизнеса процессы, сразу появляются вопросы: как их правильно называть, чтобы не запутаться? Как организовать логирование и мониторинг, чтобы видеть, что происходит с сообщениями? Как обрабатывать ошибки? Мы сталкивались со всеми этими проблемами и решили поделиться опытом.
В своей статье я расскажу, какие задачи и ошибки всплывают при построении системы очередей и как мы их решили. Поговорим о том, как придумать понятный нейминг для очередей, выстроить явные контракты сообщений между сервисами, настроить логирование и алертинг. Читая эту статью, вы получите общее представление о том, как проектировать систему очередей с нуля и узнаете, как избежать типовых ловушек при их использовании.
Что такое очереди (вдруг вы не знаете)?
Простыми словами, это механизм, который позволяют разным частям системы обмениваться сообщениями. Один сервис отправляет в очередь сообщение о событии, а получатель забирает его и обрабатывает, когда готов. Это позволяет не ждать ответа здесь и сейчас и спокойно связывать между собой разные куски системы.
Обычно ситуация такая. Есть несколько сервисов, которым нужно реагировать на событие. Например, в админке создают или редактируют продукт, дальше его данные нужно обогатить, пересобрать кэш и обновить витрину для клиентов. Чтобы эти части системы общались друг с другом и не блокировали запросы, мы подключаем Symfony Messenger, поднимаем RabbitMQ и настраиваем первые очереди. Всё работает.
Проходит время, сервисов становится больше, очередей тоже. Кто-то добавляет свои правила нейминга, где-то появляются ретраи, где-то отдельные failed-очереди. В какой-то момент вы открываете админку RabbitMQ и видите перед собой зоопарк из exchange'ей, routing key'ев и загадочных очередей, трогать которые страшно.
Если вы уже используете очереди в Symfony (или только собираетесь), после чтения вам будет проще спроектировать систему с нуля или навести порядок в существующей: договориться о нейминге, ввести понятные контракты, настроить наблюдаемость и перестать бояться, что важные сообщения тихо потеряются где-то по дороге.
Перейдем к ошибкам:
Ошибка №1. Хаотичный нейминг очередей
Первая боль, с которой наверно сталкивался каждый, когда очередей стало много – названия.
Пока очередей мало, жить можно с чем угодно: queue_1, products, failed, что-то, что придумал конкретный разработчик «на глаз». Это быстро превращается в зоопарк: по названию непонятно, кто отправитель, кто потребитель, что вообще делает сообщение и можно ли эту очередь трогать.
Здесь нам важно прийти к тому, чтобы по названию очереди мы понимали:
Какой сервис публикует это сообщение
Что сообщение делает
Какой сервис его принимает
Что это (очередь, обменник или роутинг кей)
Если это очередь, какой у нее тип (обычная, failed)
В общем, мы решили сформировать единый шаблон именования ресурсов RabbitMQ:
{serviceName}.{eventOrCommand}[.{consumerServiceName}].{queue|exchange|routingKey}[.{failed}]
Расшифровка компонентов шаблона:
serviceName– имя сервиса-источника, который публикует сообщение. Например: credits, apiProducts и т. д.eventOrCommand– название события или команды, вызвавшей отправку сообщения. Например: productUpdated, userCreated, smsSent (для событий) или sendEmail, sendSms (для команд).consumerServiceName– опционально, имя сервиса-потребителя, если необходимо указать, что очередь предназначена для конкретного сервиса. Используется, когда одно событие имеют несколько получателей, чтобы разделить очереди.queue|exchange|routingKey– индикатор типа ресурса.failed– суффикс для пометки failed-очереди или обменника. Используется для очереди, в которую помещаются сообщения после нескольких неудачных попыток обработки.
Примеры названий по шаблону:
credits.productUpdated.queue– очередь для события productUpdated из сервиса credits.credits.productUpdated.exchange– обменник для этого события.credits.productUpdated.routingKey– ключ маршрутизации для сообщения.credits.productUpdated.consumerApiUniProducts.queue– отдельная очередь для сервиса-потребителя apiUniProducts.adminProducts.productCreated.consumerApiEnrichProducts.queue.failed– failed-очередь для события productCreated (сервис-отправитель adminProducts, получатель apiEnrichProducts).adminProducts.productCreated.consumerApiEnrichProducts.exchange.failed– обменник для вышеуказанной failed-очереди.
Дальше этот шаблон просто накладывается на конкретные типы exchange:
Direct Exchange
Когда использовать: когда сообщение должно доставляться строго в одну конкретную очередь.
Пример:
framework:
messenger:
transports:
productUpdate:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
options:
exchange:
name: credits.productUpdated.exchange
type: direct
queues:
credits.productUpdated.queue:
binding_keys: ['credits.productUpdated.routingKey']
В этом примере сервис credits публикует событие productUpdated через direct-обменник. Сообщение помечается routing key credits.productUpdated.routingKey и попадает в единственную очередь credits.productUpdated.queue, привязанную к этому обменнику с таким же ключом.
Fanout Exchange
Когда использовать: когда требуется разослать сообщение во все очереди-подписчики без разбора.
Пример:
framework:
messenger:
transports:
productCacheUpdated:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
options:
exchange:
name: apiProducts.productCacheUpdated.exchange
type: fanout
queues:
apiProducts.productCacheUpdated.consumerApiUniProducts.queue: ~
apiProducts.productCacheUpdated.consumerBffCatalog.queue: ~
Здесь apiProducts рассылает сообщение через обменник apiProducts.productCacheUpdated.exchange типа fanout. Два потребителя – apiUniProduct и bffCatalog – каждый имеют собственную очередь (именованные по шаблону с указанием consumerServiceName).
Topic Exchange
Когда использовать: когда нужна гибкая маршрутизация сообщений по шаблону ключей.
Пример:
framework:
messenger:
transports:
userEvents:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
options:
exchange:
name: userService.userEvents.exchange
type: topic
queues:
userService.userEvents.consumerApiAnalytics.queue:
binding_keys: ['userService.userEvents.*.routingKey']
userService.userEvents.consumerBilling.queue:
binding_keys: ['userService.userEvents.created.routingKey']
В этом примере обменник userService.userEvents.exchange объявлен как тип topic. Очередь userService.userEvents.consumerApiAnalytics.queue привязана с шаблоном userService.userEvents.*.routingKey и будет получать любые события пользователя (любой второй сегмент ключа). В то же время очередь userService.userEvents.created.queue привязана по конкретному ключу userService.userEvents.created.routingKey – она получит только события создания пользователя.
Ошибка №2. Сырые JSON-сообщения вместо явных DTO
Не стоит воспринимать сообщение в очереди как просто JSON строку с заголовками.
Разные языки и библиотеки по-разному реагируют на изменения формата сообщения: где-то пропавшее поле тихо подменяется значением по умолчанию, а где-то то же самое сообщение вообще не парсится и падает в ошибку. В итоге продюсеру кажется, что изменение не ломает контракт, но часть потребителей либо незаметно начинает работать с некорректными данными, либо просто перестаёт читать сообщения.
Symfony Messenger по умолчанию смотрит на заголовок type и по нему понимает, во что превращать тело сообщения. Но что делать если внешние сервисы отправляют сообщения без этого заголовка, например если они написаны на разных языках (Java, Go).
Чтобы решить все эти проблемы в своих проектах мы сделали две вещи:
для каждой очереди завели отдельный DTO;
научили Messenger всегда десериализовывать JSON в эту DTO-шку, даже если внешний продюсер ничего не знает про заголовок type.
Для реализации последнего пункта мы добавили для наших обработчиков прослойку – кастомный сериализатор:
<?php
declare(strict_types=1);
namespace App\Messenger\Serializer;
use Symfony\Component\Messenger\Envelope;
use Symfony\Component\Messenger\Transport\Serialization\Serializer;
use Symfony\Component\Serializer\SerializerInterface;
class JsonMessageSerializer extends Serializer
{
/**
* @param class-string $className
*/
public function __construct(
private readonly string $className,
SerializerInterface $serializer,
) {
parent::__construct($serializer);
}
public function decode(array $encodedEnvelope): Envelope
{
// Жёстко подставляем класс DTO для этого транспорта
$encodedEnvelope['headers']['type'] = $this->className;
return parent::decode($encodedEnvelope);
}
public function encode(Envelope $envelope): array
{
$encodedEnvelope = parent::encode($envelope);
// Убираем наш служебный type, но не трогаем заголовки Messenger'а
unset($encodedEnvelope['headers']['type']);
return $encodedEnvelope;
}
}
Идея простая:
сериализатор конфигурится на уровне транспорта (messenger.yaml) и получает class-string нужного DTO;
services:
clientcore.prescoringResult.serializer:
class: App\Messenger\Serializer\JsonMessageSerializer
arguments: ['App\Messenger\DTO\PrescoringResultInput', '@serializer']
в decode() он подставляет этот класс в headers.type, чтобы базовый сериализатор Messenger собрал правильный объект;
в encode() он убирает type из исходящего сообщения, чтобы не светить внутренние имена классов и не ломать внешних потребителей.
Ключевой момент: мы не пишем сериализацию с нуля, а наследуемся от Symfony\Component\Messenger\Transport\Serialization\Serializer и зовём parent::encode()/decode().
Базовый сериализатор умеет:
упаковывать/распаковывать stamps в заголовки (в том числе RedeliveryStamp, delay и прочие служебные штуки);
сохранять информацию про ретраи, отложенные доставки и т. п.
Если использовать новый сериализатор и забыть обрабатывать эти заголовки, Messenger потеряет часть служебной информации: сообщения будут приходить, но стандартный механизм ретраев и другая инфраструктурная магия перестанет работать.
Ошибка №3. Не использовать failed-очереди
Пока всё работает, про ошибки думать не хочется. Сообщение упало с исключением – ну, посмотрим лог, перезапустим обработчик, дальше разберёмся. Так появляется классическая ситуация: где-то нет failed-очереди и часть сообщений тихо теряется.
Мы для себя сформулировали несколько простых правил:
у каждого важного транспорта своя failed-очередь;
у каждой failed-очереди свой exchange;
сообщения в failed-очередях живут ограниченное время (TTL).
Пример для prescoringResult:
framework:
messenger:
transports:
prescoringResultFailed:
dsn: '%env(MESSENGER_TRANSPORT_DSN)%'
options:
exchange:
name: 'clientcore.prescoringResult.exchange.failed'
type: fanout
queues:
'clientcore.prescoringResult.queue.failed':
arguments:
x-message-ttl: 604800000 # 7 дней
Зачем отдельный exchange для каждой failed-очереди?
По умолчанию Messenger создает exchange сам. Если у вас несколько failure-транспортов и вы не задаете имена явно, можно легко получить ситуацию, когда разные failed-очереди повешены на один и тот же exchange. В случае fanout это значит, что любое сообщение, попавшее в этот exchange, раскидывается во всe привязанные к нему failed-очереди.
Поэтому для каждого failure-транспорта мы явно задаём свой exchange и свою очередь.
Почему fanout?
В failed-сценарии нам не нужны сложные роутинги: если сообщение попало в этот exchange, оно должно просто лечь в одну конкретную failed-очередь для этого типа событий.
Важный момент: x-message-ttl. Мы ограничиваем срок жизни сообщений в failed-очереди, например, неделей:
arguments:
x-message-ttl: 604800000 # 7 дней
у команды есть время заметить проблему, вытащить сообщения, переиграть их или поправить контракт
очередь не растёт бесконечно и не превращается в вечную свалку старых ошибок, про которые уже никто не помнит.
В итоге схема получается простой: один рабочий транспорт → одна понятная failed-очередь с собственным exchange и TTL. Ошибки не смешиваются, мы видим, что и где упало, и можем спокойно разбирать проблемы по каждому типу сообщения.
Ошибка №4. «Очереди работают, значит всё ок»
Снаружи всё выглядит нормально: консьюмеры крутятся, ошибок в логах немного, сообщения вроде бы уходят, кажется, что с очередями всё в порядке. Но без нормального мониторинга легко пропустить момент, когда очереди начинают расти, сообщения лежат часами, и мы узнаем об этом только от бизнеса или клиентов.
Мы для себя выделили минимальный набор наблюдаемости: метрики и логирование, алертинг.
Какие метрики реально нужны:
-
Время обработки сообщения (опционально). Смотрим на показатели P95 и P99.
помогает нам заметить редкие кейсы и оптимизировать их;
по этой метрике удобно заводить более мягкие алерты (например, «пора посмотреть, почему стало медленнее»);
отслеживать сбои в работе очередей.
-
Размер очередей. Главный индикатор того, что система не успевает.
смотрим текущее значение и тренд.
-
Ошибки / failed. Всё, что не получилось обработать и ушло в failed-контур.
количество сообщений в failed-очередях;
счётчик ошибок обработки по типу сообщения/хендлеру.
Часть метрик отдаёт сама инфраструктура RabbitMQ – их мы просто собираем в Prometheus/Grafana. Ручная работа в коде нужна только там, где важен бизнес-контекст: ошибки, статусы и время обработки внутри хендлеров.
Упрощённый пример, как это может выглядеть в обработчике:
<?php
declare(strict_types=1);
namespace App\Messenger\Handler;
use App\Messenger\DTO\PrescoringResultInput;
use App\Service\PrescoringService;
use App\Infrastructure\PrometheusMetrics\MetricsCollectorInterface;
use Psr\Log\LoggerInterface;
use Symfony\Component\Messenger\Attribute\AsMessageHandler;
#[AsMessageHandler]
final readonly class PrescoringResultHandler
{
public function __construct(
private PrescoringService $service,
private LoggerInterface $logger,
private MetricsCollectorInterface $metricsCollector,
) {
}
public function __invoke(PrescoringResultInput $message): void
{
try {
// основная бизнес-логика обработки сообщения
$this->service->handle($message);
} catch (\Throwable $e) {
$this->logger->error('Error while handling queue message', [
'exception' => $e,
'messageId' => $message->id ?? null,
]);
$this->logAndCollectMetrics($message, false);
throw $e;
}
$this->logAndCollectMetrics($message, true);
}
private function logAndCollectMetrics(OrderProcessedMessage $message, bool $success): void
{
// 1. Различные мерики: например количество сообщений по типу + success/fail
$this->metricsCollector->collectQueueMessagesTotal(
type: 'order_processed',
success: $success,
);
// 2. Лог: чтобы по messageId/данным можно было разобраться, что пошло не так
$this->logger->info('Queue message handled', [
'queue' => 'order_processed_queue',
'messageId' => $message->id ?? null,
'payload' => (array) $message,
'success' => $success,
]);
}
}
Простые алерты поверх метрик
Метрики и логи сами по себе бесполезны, если их никто не смотрит, поэтому мы завели очень простой набор правил:
failed-очереди > N (например, N = 100) – где-то системно падают сообщения и не доходят до обработчиков;
размер рабочей очереди растёт в течение T минут – значит консьюмеры не справляются, нужно либо масштабирование, либо разбор «тяжёлых» сообщений;
время обработки по P95/P99 сильно выросло, но очередь пока не растёт – повод заняться оптимизацией.
Такой минимальный набор не перегружает систему мониторинга, но позволяет вовремя понять, что с очередями что-то пошло не так.
Спасибо, что дочитали до конца, надеюсь статья была полезной. Напишите в комментариях, какую ошибку вы бы сделали пятой – с удовольствием почитаю ваши истории и, возможно, дополню ими статью.
Yago
Немного дополню
1) Нейминг и однотипная структурированность важны практически везде. Будь то код, таблицы в БД, названия очередей или классов в коде. Чем однотипнее, тем меньше разборов полетов на тему "как назвать", и тем легче разбираться, т.к. глаза начинают привыкать к шаблону.
2) Только сериализация DTO не всегда спасает. Бывает, из сообщения удаляется важная часть этого DTO, или нужно, чтобы разобралась часть старых сообщений старым механизмом обработки, а новый алгоритм применился к новым сообщениям. И промежуток времени разбора может быть большим. Здесь помимо сериализации помогает выработка подхода к версионированию сообщений в очередях и в целом согласование содержимого сообщений: иногда из сообщения важен только id, а код уже по нему дальше восстанавливает необходимые данные из других источников при обработке. Или некоторые сложные типы данных могут иметь проблемы сериализации, если не уследить.
4) При увеличении фоновых задач появляются взаимосвязи между ними. Помимо id message рекомендую сохранять базовый request-id и parent-id, которые породили сообщение в очереди. Можно использовать для этого тот же трейсинг. Тогда будет легче контролировать путь процесса и понимать путь запроса пользователя, который привел к разбираемому случаю.