Всем доброго времени суток, друзья.
Сегодня захотелось поговорить о том, как можно работать с RabbitMQ в Symfony и совсем чуть-чуть о некоторых подводных комнях. В конце я напишу парочку интересных моментов о кролике (рус. перевод «rabbit») для тех, кто совсем в танке.

Я не буду рассказывать про сам RabbitMQ, поэтому если вы пока и этого не знаете, почитайте следующие переводы:

Статья 1
Статья 2
Статья 3
Статья 4
Статья 5

Не бойтесь примеров на перле или пайтоне — это не страшно, все достаточно понятно из исходного кода.

+ Все достаточно подробно описано, когда я читал это в свое время, достаточно было интерпретировать код мысленно, чтобы понять как что и зачем.

Если вы уже знаете, что такое консумер и почему в нем нужно делать $em->clear() + gc_collect_cycles, а после закрывать соединение с базой данных, то, скорее всего, вы ничего нового для себя не узнаете. Статья скорее для тех, кто не хочет разбираться с AMQP протоколом, но которым нужно прямо сейчас применять очереди и выбор почему-то бездумно пал на RabbitMQ, а не тот же легковестный beanstalkd.
Если же у вас микросервисная архитектура и вы ждете, что я расскажу вам как сварить коммуникацию между компонентами через AMQP, как красиво делать RPC, то я сам чего-то подобного очень давно жду на Хабре…

Перед нами задача: отправлять сообщения на Email в очереди, используя RabbitMQ, а так же обеспечить отказоустойчивость: если почтовый сервер ответил таймаутом или что-то ещё сломалось — нужно попробовать выполнить задачу через 30 секунд ещё раз.

Итак, устанавливаем наш бандл.

Я слишком ленив, чтобы описывать вам, как нужно копировать composer require команду и строку в AppKernel.

Я очень надеюсь, что вы сами это сделали и готовы приступать к конфигурированию нашего бандла.

Если нет, то вот вам полный гайд для самых маленьких:
Установка RabbitMQ:
echo 'deb http://www.rabbitmq.com/debian/ testing main' | sudo tee /etc/apt/sources.list.d/rabbitmq.list
wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add -
sudo apt-get update
sudo apt-get install rabbitmq-server
sudo rabbitmq-plugins enable rabbitmq_management

Теперь вы можете открыть ваш localhost:15672 под учеткой: guest guest и увидеть много прикольных вещей, в которых скоро вы будете разбираться и чувствовать себя мужиком.

Теперь устанавливаем сам бандл:

composer require php-amqplib/rabbitmq-bundle

И регистрируем его в нашем приложении:

// app/AppKernel.php

public function registerBundles()
{
    $bundles = array(
        new OldSound\RabbitMqBundle\OldSoundRabbitMqBundle(),
    );
}

Вот и всё.



Конфигурация бандла для нас:

old_sound_rabbit_mq:
    connections:
        default:
            host:     'localhost'
            port:     5672
            user:     'guest'
            password: 'guest'
            vhost:    '/'
            lazy:     false
            connection_timeout: 3
            read_write_timeout: 3
            keepalive: false
            heartbeat: 0
            use_socket: true
    producers:
        send_email:
            connection:       default
            exchange_options: { name: 'notification.v1.send_email', type: direct }

    consumers:
        send_email:
            connection:       default
            exchange_options: { name: 'notification.v1.send_email', type: direct }
            queue_options:    { name: 'notification.v1.send_email' }
            callback:         app.consumer.mail_sender

Здесь огромное внимание следует обратить на producers и consumers. Если очень коротко и просто: producer — это то, что отправляет сообщения через RabbitMQ в consumer, а consumer в свою очередь — та вещь, которая получает и обрабатывает эти сообщения. Здесь же exchange_options — опции для обменника (вы же прочитали статьи про rabbitmq, которые были в начале статьи?), queue_options — опции для очереди (аналогично). Так же стоит обратить внимание на callback в consumer — здесь указывается ID сервиса, который расширяет ConsumerInterface (execute метод с аргументом сообщения).

Т.к. пока что у вас его нету, при запуске приложения или компиляции контейнера мы получим какое-то DI исключение, что сервис не найден, но мы его запрашиваем. Поэтому давайте создавать наш сервис:

#app/config/services.yml
services:
    app.consumer.mail_sender:
        class: AppBundle\Consumer\MailSenderConsumer 

И сам класс:

namespace AppBundle\Consumer;

use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;

/**
 * Class NotificationConsumer
 */
class MailSenderConsumer implements ConsumerInterface
{
    /**
     * @var AMQPMessage $msg
     * @return void
     */
    public function execute(AMQPMessage $msg)
    {
        echo 'Ну тут типа сообщение пытаюсь отправить: '.$msg->getBody().PHP_EOL;
        echo 'Отправлено успешно!...';
    }
}

Ну вы же не обиделись, что я не включил в статью как работать со SwiftMailer? :) Нам важно, чтобы сюда асинхронно доставлялась строка через очередь сообщений, то, как мы будем обрабатывать эту строку — дело наше. Почта — всего лишь пример кейса.

Как же нам передать строку в наш консьюмер? Для этого давайте создадим тестовую команду:

namespace AppBundle\Command;

use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;

class TestConsumerCommand extends ContainerAwareCommand
{
    /**
     * {@inheritdoc}
     */
    protected function configure()
    {
        $this
            ->setName('app:test-consumer')
            ->setDescription('Hello PhpStorm');
    }

    /**
     * {@inheritdoc}
     */
    protected function execute(InputInterface $input, OutputInterface $output)
    {
        $this->getContainer()->get('old_sound_rabbit_mq.send_email_producer')->publish('Сообщенька для отправки на мыло...');
    }

Снова извиняюсь, что не сделал для вас контроллер с красивой формочкой — я слишком ленив для этого. Да и слишком уж излишне. А я экономный лентяй и люблю рисовать, мечтаю немного в сторону теорий и архитектуры приложений. Отвлеклись.

Теперь запускаем наш consumer и приказываем ему ждать сообщения из RabbitMQ:

bin/console rabbitmq:consumer send_email -vvv

И отправим ему сообщение из нашей тестовой команды:

bin/console app:test-consumer

И вот сейчас, в процессе rabbitmq:consumer, мы можем увидеть наше сообщение! И что псевдо отправка завершилась успехом.

А теперь давайте посмотрим, как можно реализовать отложенную обработку сообщений в случае ошибок. Я не буду использовать плагин RabbitMQ для отложенных сообщений. Мы будем достигать этого путем создания новой очереди, в которой укажем время жизни сообщений 30сек и установим настройку: после смерти — перекладываться в основную очередь.

Достаточно лишь добавить новый producer:

producers:
        send_email:
            connection:       default
            exchange_options: { name: 'notification.v1.send_email', type: direct }
            
        delayed_send_email:
            connection: default
            exchange_options:
                name: 'notification.v1.send_email_delayed_30000'
                type: direct
            queue_options:
                name: 'notification.v1.send_email_delayed_30000'
                arguments:
                    x-message-ttl: ['I', 30000]
                    x-dead-letter-exchange: ['S', 'notification.v1.send_email']

Теперь давайте изменим логику консумера:

namespace AppBundle\Consumer;

use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface;
use PhpAmqpLib\Message\AMQPMessage;

/**
 * Class NotificationConsumer
 */
class MailSenderConsumer implements ConsumerInterface
{
    private $delayedProducer;

    /**
     * MailSenderConsumer constructor.
     * @param ProducerInterface $delayedProducer
     */
    public function __construct(ProducerInterface $delayedProducer)
    {
        $this->delayedProducer = $delayedProducer;
    }

    /**
     * @var AMQPMessage $msg
     * @return void
     */
    public function execute(AMQPMessage $msg)
    {
        $body = $msg->getBody();

        echo 'Ну тут типа сообщение отправляю '.$body.' ...'.PHP_EOL;

        try {
            if ($body == 'bad') {
                throw new \Exception();
            }

            echo 'Успешно отправлено...'.PHP_EOL;
        } catch (\Exception $exception) {
            echo 'ERROR'.PHP_EOL;
            $this->delayedProducer->publish($body);
        }
    }
}

А вообще для вывода полезно использовать LoggerInterface — и красиво и масштабируется.
Но нам же лень и мы не хотим создавать дополнительные «думки», верно? Просто знайте.

Теперь мы должны прокинуть producer для отложенной очереди:

#app/config/services.yml
services:
    app.consumer.mail_sender:
        class: AppBundle\Consumer\MailSenderConsumer
        arguments: ['@old_sound_rabbit_mq.delayed_send_email_producer']

И изменим команду:

namespace AppBundle\Command;

use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;

class TestConsumerCommand extends ContainerAwareCommand
{
    /**
     * {@inheritdoc}
     */
    protected function configure()
    {
        $this
            ->setName('app:test-consumer')
            ->setDescription('Hello PhpStorm');
    }

    /**
     * {@inheritdoc}
     */
    protected function execute(InputInterface $input, OutputInterface $output)
    {
        $this->getContainer()->get('old_sound_rabbit_mq.send_email_producer')->publish('Ура, сообщенька...');
        $this->getContainer()->get('old_sound_rabbit_mq.send_email_producer')->publish('bad');
    }
}

Теперь вместе с нормальным сообщением она будет отправлять и плохое сообщение.

Если мы запустим, то увидим следующий вывод:

Ну тут типа сообщение отправляю Ура, сообщенька...
Успешно отправлено...

Ну тут типа сообщение отправляю bad...
ERROR

Спустя 30 секунд еще раз появится сообщение об обработке:
Ну тут типа сообщение отправляю bad...
ERROR

И так бесконечно. Логику максимальных попыток и т.п. продумывайте сами. Далее я дам пару советов для вашего прода и некоторых фич.

Теперь советы для вашего прода:

1) Не отходя от темы с максимальными попытками обработки: знайте на все 102% все возможные исключения контекста с которым вы работаете! Умейте представлять, когда повторная обработка требуется, а когда нет, иначе — привет мусорке из логов и отсутствия понимания что происходит. В случае, если битая задача будет крутится в RabbitMQ, с реальными данными, нормальными задачами, вы вряд ли сможете выкинуть сломанные задачи без костылей, не обновляя код консьюмера и не перезапуская его. Поэтому продумывайте это сразу. В данном случае правильным было бы ловить только лишь SMTPTimeOutException какой-нибудь.
Так же с такой моделью важно понимать, что: на 1 очередь — одна «глобальная ответственность смены состояния чего либо». Не стоит давать слишком много рискованных задач своему воркеру. Если рассмотреть вариант с 1С, то проблема может быть в следующем: допустим при успешном или неуспешном изменении\добавлении товара в 1С мы записываем в базу данных что-нибудь, например, дату последней удачной синхронизации или неудачной. Т.е. тут обновляются сразу 2 базы данных: бд 1С и бд вашего приложения. Допустим в 1С все успешно создалось, далее идет обновление в базе данных поля «дата последней удачной синхронизации» — хоп, вылезла ошибочка, опять же, сервер бд не отвечает — задача откладывается на «потом» и повторяется, пока база данных не начнет отвечать. И при этом каждый раз «подзадача» связанная с созданием сущности в 1С будет успешно выполняться, каждый раз при неудачной попытке записи в базу данных сайта, что неправильно.

2) Прочитайте про durable, раз уж мы с вами используем RabbitMQ. P.S: это заводится как true\false флаг «durable» в конфиге бандла, конкретно — в exchange_options и queue_options

3) Всю свою жизнь закрывайте соединение к базе данных после выполнения работы программы. А так же запускайте очистку EM и после сборщик мусора для чистки ссылок. Т.е. в конце концов наш консьюмер должен выглядеть как-то так:

class MailSenderConsumer implements ConsumerInterface
{
    private $delayedProducer;

    private $entityManager;

    /**
     * MailSenderConsumer constructor.
     * @param ProducerInterface      $delayedProducer
     * @param EntityManagerInterface $entityManager
     */
    public function __construct(ProducerInterface $delayedProducer, EntityManagerInterface $entityManager)
    {
        $this->delayedProducer = $delayedProducer;
        $this->entityManager = $entityManager;
        
        gc_enable();
    }

    /**
     * @var AMQPMessage $msg
     * @return void
     */
    public function execute(AMQPMessage $msg)
    {
        $body = $msg->getBody();

        echo 'Ну тут типа сообщение отправляю '.$body.' ...'.PHP_EOL;

        try {
            if ($body == 'bad') {
                throw new \Exception();
            }

            echo 'Успешно отправлено...'.PHP_EOL;
        } catch (\Exception $exception) {
            echo 'ERROR'.PHP_EOL;
            $this->delayedProducer->publish($body);
        }
        
        $this->entityManager->clear();
        $this->entityManager->getConnection()->close();

        gc_collect_cycles();
    }
}

Консьюмер работает как демон, поэтому постоянно копить в нем ссылки и держать соединение с бд — это плохо. В случае с MySQL вы получите MySQL server has gone away.

4) Много думайте, почему ваша модель отложенных сообщений может неожиданно убить ваш бизнес. Например у нас есть механизм, который при изменении товара в админке заливает эти изменения через очередь в 1С. Теперь представим ситуацию: администратор меняет товар -> создается задача #1 на попытку изменить те же данные в базе 1С. Сервер 1С не отвечает, поэтому задачка просто перекладывается постоянно, пока все не заработает. За это время администратор решил еще кое-что подправить в том же товаре, что он и делает. Регистрируется задача #2.

А теперь представьте ситуацию, когда поочередно выполняются и откладываются задачи #1 и #2.

Что если 1С заработает к моменту выполнения задачи #2? Задача выполнится и зальёт последние изменения. Далее пойдет в ход задача #1 и затрет собой стабильные изменения :)
Выход: отправляем timestamp в качестве version, и, если задача «из прошлого» — выкидываем её.

5) Идешь в асинхронность — прочитай про многие архитектурные проблемы, а также race condition, несогласованность консумеров на разных машинах и прочее.

6) Пишите версии вашим очередям… Ух как помогает на реальном проде. В принципе мы так и сделали в этом примере.

7) Возможно тебе не нужен RabbitMQ и целый AMQP протокол. Посмотри в сторону beanstalkd.

8) Запускайте консумеры и всякое прочее демоническое на php через supervisor и подключите полное логирование падения процессов в нём. У него так же есть web интерфейс для управления всем этим делом, что так же очень удобно. Проблемы будут всегда.

Комментарии (13)


  1. Fesor
    29.09.2017 10:23
    +3

    сборщик мусора для чистки ссылок

    Вот тут по подробнее, зачем? Во первых просто для ссылок php и так почистит тогда когда надо, в прочем как и для циклических ссылок. Раскройте цель подобных манипуляций со сборщиком подробнее. Есть мнение что это как-то связано с


    постоянно копить в нем ссылки и держать соединение с бд — это плохо.

    как я уже говорил — php сам удалит ссылки. Более того вызов gc_collect_cycle вообще ничего не сделает особо.


    Зато другая проблема появляется — reset менеджера при исключениях. И вот там вполне коннекшен умирает и его надо передернуть. А закрывать коннекшен между задачами в очереди — крайне неразумно. Скажем для postgresql реконнект штука относительно дорогая, да и зачем нам этот оверхэд по времени обработки сообщений..


    1. dizzy7
      29.09.2017 15:23

      gc_collect_cycle бесполезен, подтверждаю. А вот вызов gc_mem_caches стоит делать — иначе php не отдаёт память системе — это может быть критично если размер заданий плавает и на некоторых потребление памяти сильно больше среднего.


      1. Fesor
        29.09.2017 17:03

        Вот тут я уже не знаю точно, но насколько я помню менеджер памяти не просто так не высвобождает запрошенные у системы блоки, а потому что планирует реюзать их далее. То есть если приложению вдруг понадобилось 200 метров, то почему бы ему еще раз не понадобилось?


        Ну то есть я к тому что часто ее вызывать наверное тоже не стоит как минимум потому что иначе это будет вынуждать пых чаще просить систему выделять память. Но периодически, скажем раз в 100 сообщений или в зависимости от потребления памяти — пожалуй стоит.


  1. VolCh
    29.09.2017 13:51

    Разве rabbitmq:consumer работает в режиме демона? Мне казалось, что обычное CLI приложение.


    1. Fesor
      29.09.2017 14:25

      Ну в целом все упирается в то что считать "режимом демона". Эта команда — обычное CLI приложение, которое не умирает после первого сообщения и умеет ловить/обрабатывать сигналы (пока только stop, в restart написан TODO: implement). Далее вопрос как запускать.


  1. igorCD
    29.09.2017 15:23
    +1

    Позвольте дополнить самую последнюю фразу «Проблемы будут всегда.» данной статьи.

    О некоторых багах phpamqp-lib. Баг репорты есть на гитхабе, до сих пор нету решения.
    1. Consumer: допустим запустили мы consumer и активировали heartbeat. От тихо ожидает новых сообщений в этой точке github.com/php-amqplib/RabbitMqBundle/blob/master/RabbitMq/BaseConsumer.php#L53. Бесконечно долго, т.к. timeout = 0. Допустим, падает сетевое соединение на какое-то время, затем оно восстанавливается. Консьюмер по прежднему ждёт сообщения в той точке, а RabbitMQ уже закрыл соединение. Всё, консьюмер будет висеть в памяти до тех пор, пока его не прибьют сигналом каким-нибудь. Соответсвенно, если в консьюмере есть таймер для «самоубийства», то он не сработает.

    2. Без heartbeat могут быть проблемы с незакрытыми соединениями на стороне RabbitMQ. Возможно тут ещё всему виной HAProxy, который в нашем случае стоит перед кластером RabbitMQ.

    Из-за этих казалось бы незначительных проблем, код надёжного консьюмера выглядит как бесконечный try/catch и while loops, чтобы консьюмер мог реагировать на разрыв соединения и переподключаться. Такого функционала нету ни в какой высокоуровневой библиотеке (см. Enqueue, например, или бандл, описанный в данной статье)

    Далее, есть очень хорошая инициатива — github.com/queue-interop/queue-interop — при помощи данных интерфейсов можно быть независимым от конкретной библиотеки, которая реализует AMQP протокол (phpamqp-lib, php extension, bunny lib). Интерфейсы позаимстованны из мира Java. Но они тоже на начальном этапе своего существования.

    Например, в нашем проекты мы решили стать менее зависимыми от конкретной AMQP библиотеки. Посему перевели всех наших consumer/producer на github.com/php-enqueue/enqueue-dev. Проект не на Symfony, поэтому используем только имплементацию Queue Interop интерфейсов с использованием phpamqp-lib, чтобы в ближайшем будущем попробовать имплементацию от bunny lib.

    Но тут ждало разочарование: такая фича, как Publisher Confirms, которая есть только в RabbitMQ, отсутсвует out-of-the-box в самих интерфейсах от Queue Interop (тикет на гитхабе есть). Пришлось делать костыли, очень нехорошие костыли. В проекте у нас договорённость о at-least-once delivery, потому и нужен Publisher Confirms.

    Если всё подытожить, то вот мои рекомендации:
    1. Подумайте 10 раз, нужин ли вам Message Broker вообще. Если можно, используйте MySQL/файлы/что угодно, что уже есть в вашем проекте. Тех поддержка самого RabbitMQ в продакшене — тот ещё геморой для Operations Team.
    2. Если всё же решились на Message Broker, то ответьте на вопрос, можете ли вы потерять пару сообщений? Если можете — то используйте любые бандлы/библиотеки. В противном случае крайне рекомендую сначала написать пару consumer/producer на низком уровне (к примеру, используя phpamqp-lib без обёрток) и хорошенько пройдитесь отладчиком, разрывая соединение/физически отключая/аварийно выключая RabbitMQ. Вы удивитесь, как легко можно потерять сообщения. А уже потом подбирайте конкретные высокоуровневые бандлы под свои нужды.


    1. maksim_ka2
      01.10.2017 12:08

      Но тут ждало разочарование: такая фича, как Publisher Confirms, которая есть только в RabbitMQ, отсутсвует out-of-the-box в самих интерфейсах от Queue Interop (тикет на гитхабе есть). Пришлось делать костыли, очень нехорошие костыли. В проекте у нас договорённость о at-least-once delivery, потому и нужен Publisher Confirms.


      Я думаю что Publisher Confirms не про at-least-once delivery. Это расширение дает возможность отправителю получать уведомления когда его сообщения обработаны получателем.

      В документации ничего не сказано про случай когда соединение с получателем обрывается (по причине исключения например) и брокер возвращает сообщение в очередь с пометкой redelivered. Есть шанс обработать сообщение больше чем один раз. Уверен есть и другие ситуации.

      Если бы мне нужно было «at-least-once delivery» я бы делал это на стороне получателя + хранилище для проверки на повтор.

      От отправителя требуется проставить уникальный идентификатор сообщению.


  1. maksim_ka2
    29.09.2017 16:28
    +2

    Рекомендую посмотреть в сторону EnqueueBundle. Более мощное решение как по части использования так и по колличесту фишек для программистов. Кто много работал с RabbitMqBundle — меня поймет. Код получается проще и лаконичнее. Сравнение можно посмотреть тут. Работает не только с php-amqplib но и bunny, amqp-ext (спасибо amqp interop).

    Практически все что вы описываете, в Enqueue уже реализовано из коробки: например, такие веши как, переподключение к базе данных, чистка entity manager, повторы с задержкой, RPC.

    EnqueueBundle умеет это делать из коробки.

    коммуникацию между компонентами через AMQP, как красиво делать RPC, то я сам чего-то подобного очень давно жду на хабре…


    Вот Message bus to every PHP application


    1. Big_Shark
      30.09.2017 18:17

      А почему не Bernard? Как мне кажется он лучше поддерживается.


      1. maksim_ka2
        30.09.2017 20:45

        Не согласен. Вот активность по enqueue репозиторию и такая же для bernard.

        По фичам и коллисеству поддерживаемых брокеров отрыв так же сущесветнный.


      1. Fesor
        01.10.2017 21:01

        Пользуюсь бернардом уже год, новые проекты переводим на enqueue. Он удобнее и больше полезных фич из коробки.


        1. Big_Shark
          01.10.2017 21:13

          У меня бернард на 3 проектах, когда выбирал что ставить то больше внимания обратил на количество звезд на гитхабе. Но в будущем попробую enqueue. Спасибо.


          1. Fesor
            01.10.2017 21:17

            обратил на количество звезд на гитхабе.

            я то же на это купился, плюсь понравилось то о чем общаются люди в ишус трекере. Но вот прошел год и воз и поныне там. 1.0.0 все еще в альфе.