Хочу поделиться практической проблемой конфигурирования брокера очередей RabbitMQ в Yii2. Предупреждаю читателя, что не обладаю экспертным мнением по работе с данным брокером очередей, однако очень хочется восполнить пробелы документации Yii2 и закрепить результат собственных мучений. Итак, если вы когда либо сталкивались с проблемой, что посылаемые сообщения попадают во все очереди, которые есть на сервере очередей, вы согласны, что это проблема и не понимаете почему так происходит, тогда прошу под кат.

Почему вы можете столкнуться с таким поведением? Например, если вы, как и я раньше не работали с RabbitMQ, но работали например с Gearman. Сам Gearman, простой как железная дорога (позаимствовано у известного и уважаемого в интеренете персонажа). Вы создаете очередь с неким именем, кладете туда данные. Воркер читает из очереди с этим же именем. Всё просто. Теперь настала пора использовать модные технологии, сами не понимая зачем вы выбираете RabbitMQ. Потом радуетесь, что в Yii2 есть готовая абстракция для многих популярных брокеров очередей. Скудная документация подсказывает дефолтную конфигурацию чтобы всё завелось:

return [
    'bootstrap' => [
        'queue', // The component registers its own console commands
    ],
    'components' => [
        'queue' => [
            'class' => \yii\queue\amqp_interop\Queue::class,
            'port' => 5672,
            'user' => 'guest',
            'password' => 'guest',
            'queueName' => 'queue',
            'driver' => yii\queue\amqp_interop\Queue::ENQUEUE_AMQP_LIB,

            // or
            'dsn' => 'amqp://guest:guest@localhost:5672/%2F',

            // or, same as above
            'dsn' => 'amqp:',
        ],
    ],
];

Казалось бы всё до безумия просто, вот знакомая нам строка с queueName, копипастим, исправляем, запускаем — работает! Делаем очереди для других компонентов системы. Распараллеливаем чем можем наш php. Commit, Push довольные кладем таску в QA и идем читать хабр (в обеденный перерыв).

Тут на самом интересном нас прерывает QA в чате, и говорит, что происходит что-то странное. Почему-то данные которые пишут воркеры (косьюмеры) дублируются. Что что? Не может такого быть. Идем проверять логи. Видим, что сообщение пишется в очередь, при чем очередь выбрана правильно, hash jobId получен. Не-не, у нас нет никаких ошибок. Пишем довольные QA, проверь ещё раз, не может такого быть, у нас всё хорошо.

Буквально через пол часа нас отвлекают снова, ошибка повторилась и тут же на мыло упало уведомление — задача снова переведена в работу. Ну что, я ж программист, сейчас разберемся. У RabbitMQ, в отличие от того же Gearman есть веб интерфейс, в котором есть много информации о работе сервера. Выглядит сие чудо вот так:



Кидаем ещё пару сообщений в очередь, видим в вебморде, что наши сообщения доходят и обрабатываются воркером. Случайный взгляд замечает, что когда мы кидаем сообщение в очередь, во всех очередях подскакивает график «Message rates».



Ещё сто раз проверяем конфигурацию, перечитываем скудную документациюю Yii. Мы всё сделали правильно. Идем читать документацию на сайт rabbit'a. После пары десятков минут блуждания в темноте натыкаемся на туториал. Сразу после первого абзаца знакомимся с причиной наших непоняток — Exchanges. Повторять документацию не буду, очень коротко.

В RabbitMQ мы не пишем сообщение в очередь, мы пишем его в exchange, этакий прокси, который одним концом принимает наши сообщения, а другим концом общается с очередями на сервере. В его власти принять решения в какую очередь положить наши данные. Занятно, что в документации Yii об этом нет ни строчки. С первого взгляда непонятно как сконфигуровать exchange, ныряем в кишочки и в файле vendor/yiisoft/yii2-queue/src/drivers/amqp_interop/Queue.php:176 находим заветное свойство, которое можно засетить. Тут надо сказать, что драйверов для RabbitMQ много, в моем случае используется enqueue/amqp-lib. Выставляем exchangeName, тестируем, ничего не меняется. Мы ж как настоящий русский инженер, сначала пробуем, а потом идем вдумчиво читать документацию ещё раз. Читаем ещё раз внимательно, потом идем в веб морду rabbit'a и видим следующее:



Несколько наших очередей связаны с одним и тем же exchange. Бинго! Вот она причина, одно «но», я их не связывал. Идем снова в кишочки драйвера, находим метод setupBroker строчку 392

$this->context->bind(new AmqpBind($queue, $topic));

Вот это неуправляемое связывание.

И так, недолго поразмыслив, приходим к выводу, что для каждой очереди должен быть объявлен свой exchange, тогда связь будет верной и у одного exchange будет всего одна связанная очередь. таким образом мы добьемся поведения аналогичного Gearman. Кстати, в документации подробно описано для чего сделан exchange, и как я понял, одна из причин это возможность связать с exchange несколько очередей. Вот только я не придумал, что за кейс такой когда это может понадобиться, ребята пишите в комментариях. И пишите, сталкивались ли вы с описанной выше ситуацией или я всё делаю неправильно?

Комментарии (7)


  1. kirillzak
    05.02.2019 23:27

    Хорошая практика при смене инструмента — почитать в документации как работает данный инструмент и какие best practices рекомендуют его разработчики. В случае с кроликом сразу бы бросилось в глаза, что по мимо queue в нём есть и exchange. Это бы сэкономило время и сразу дало бы направление для поиска решения в Yii.


  1. DOC_tr
    06.02.2019 09:03

    Немного странная реализация плагина.

    Если кролик нужен на уровне «отправки почты или смсок», то ничего об обменниках (exchange) знать и не нужно. Так как rabbit, для очередей без обменников, создает обменники автоматически (если не ошибаюсь, то с type=direct и с точно таким же именем), поэтому код

    $queue = $this->context->createQueue($this->queueName);
    $this->context->declareQueue($queue);

    Создаст очередь, в которую можно сразу же писать без обменника.

    Исходя из кода либы, в конфиге, вместе с `queueName` можно указывать `exchangeName` (не утверждаю, но судя по коду это должно быть именно так), хотя об этом в доке ни слова не сказано

    Схема с обменниками очень удобна, при распределенной архитектуре. Например когда есть 2 очереди — одна отправляет почту, а другая пишет об этом в лог. Тогда подписываем их на один обменник и отправляем только 1 сообщение, которое попадет в 2 очереди. Пример, конечно, притянут за уши, но думаю вполне понятен


    1. komandakycto Автор
      06.02.2019 09:55

      Спасибо, кейс с логом и отправкой имеет право на жизнь.


  1. vtvz_ru
    06.02.2019 10:10
    +1

    В свое время тоже использовал кролика в Yii2, но без плагинов, а с библиотекой из документации. Вообще rabbitmq такой комбайн, который может все, что хочешь: queue, event-subscriber, rpc и п.р. И используемый метод будет зависеть от умелого жонглирования конфигами. И для правильного использования комбайна нужно иметь знания, как для комбайна. А типичный для нас подход plugin first в итоге обычно заставляет нас вникать в детали, потому что не работает оно так, как нужно


  1. ilyaplot
    06.02.2019 10:50

    Если класть сообщение в default exchange, оно не попадет во все очереди. Очень странная реализация плагина (или драйвера?)


  1. avg
    06.02.2019 12:53

    Кстати, в документации подробно описано для чего сделан exchange, и как я понял, одна из причин это возможность связанть с exchange несколько очередей. Вот только я не придумал, что за кейс такой когда это может понадобиться, ребята пишите в комметариях.

    Например, для автоматического сохранения истории всего отправленного в очередь. Или для федерации, которую многие используют вместо кластера, когда одна из очередей вычитывается на другом сервере кролика. Возможно, для того и другого одновременно


  1. perfect_pixel
    06.02.2019 15:52

    Для первого плагина вы конечно молодец, что завели эту машину. Но про exchanges рассказывается уже в третьей статье 'get started' на их официальном сайте. Документация у кролика конечно не самая дружелюбная, но за полдня осилить можно все гайды :)

    Вот только я не придумал, что за кейс такой когда это может понадобиться

    Да, для самых простых плюшек (отправил и забыл) достаточно и просто очередей. А вот exchanges, да еще вместе с routing_key дают свободу для построения очень сложных структур. www.rabbitmq.com/tutorials/tutorial-four-php.html

    Можно использовать связки сразу нескольких exchanges. Могу привести хороший пример, который сам недавно реализовывал — это «отложенные сообщения» aka dead-letter exchange. Например когда вам нужен или просто сервис «отложенной отправки» или когда чей-то API прямо сейчас недоступен, и вы пытаетесь через N минут ожидания повторить запрос. Вот тут связка нескольких обменников и роут-ключей будет очень кстати.

    Ссыль на лучшее исполнение этого подхода (на ноде) — github.com/ria-com/rabbit-mq-learning
    Даже схема есть.