Сегодня захотелось поговорить о том, как можно работать с RabbitMQ в Symfony и совсем чуть-чуть о некоторых подводных комнях. В конце я напишу парочку интересных моментов о кролике (рус. перевод «rabbit») для тех, кто совсем в танке.
Я не буду рассказывать про сам RabbitMQ, поэтому если вы пока и этого не знаете, почитайте следующие переводы:
Статья 1
Статья 2
Статья 3
Статья 4
Статья 5
Не бойтесь примеров на перле или пайтоне — это не страшно, все достаточно понятно из исходного кода.
+ Все достаточно подробно описано, когда я читал это в свое время, достаточно было интерпретировать код мысленно, чтобы понять как что и зачем.
Если вы уже знаете, что такое консумер и почему в нем нужно делать $em->clear() + gc_collect_cycles, а после закрывать соединение с базой данных, то, скорее всего, вы ничего нового для себя не узнаете. Статья скорее для тех, кто не хочет разбираться с AMQP протоколом, но которым нужно прямо сейчас применять очереди и выбор почему-то
Если же у вас микросервисная архитектура и вы ждете, что я расскажу вам как сварить коммуникацию между компонентами через AMQP, как красиво делать RPC, то я сам чего-то подобного очень давно жду на Хабре…
Перед нами задача: отправлять сообщения на Email в очереди, используя RabbitMQ, а так же обеспечить отказоустойчивость: если почтовый сервер ответил таймаутом или что-то ещё сломалось — нужно попробовать выполнить задачу через 30 секунд ещё раз.
Итак, устанавливаем наш бандл.
Я слишком ленив, чтобы описывать вам, как нужно копировать composer require команду и строку в AppKernel.
Я очень надеюсь, что вы сами это сделали и готовы приступать к конфигурированию нашего бандла.
echo 'deb http://www.rabbitmq.com/debian/ testing main' | sudo tee /etc/apt/sources.list.d/rabbitmq.list
wget -O- https://www.rabbitmq.com/rabbitmq-release-signing-key.asc | sudo apt-key add -
sudo apt-get update
sudo apt-get install rabbitmq-server
sudo rabbitmq-plugins enable rabbitmq_management
Теперь вы можете открыть ваш localhost:15672 под учеткой: guest guest и увидеть много прикольных вещей, в которых скоро вы будете разбираться и чувствовать себя мужиком.
Теперь устанавливаем сам бандл:
composer require php-amqplib/rabbitmq-bundle
И регистрируем его в нашем приложении:
// app/AppKernel.php
public function registerBundles()
{
$bundles = array(
new OldSound\RabbitMqBundle\OldSoundRabbitMqBundle(),
);
}
Вот и всё.
Конфигурация бандла для нас:
old_sound_rabbit_mq:
connections:
default:
host: 'localhost'
port: 5672
user: 'guest'
password: 'guest'
vhost: '/'
lazy: false
connection_timeout: 3
read_write_timeout: 3
keepalive: false
heartbeat: 0
use_socket: true
producers:
send_email:
connection: default
exchange_options: { name: 'notification.v1.send_email', type: direct }
consumers:
send_email:
connection: default
exchange_options: { name: 'notification.v1.send_email', type: direct }
queue_options: { name: 'notification.v1.send_email' }
callback: app.consumer.mail_sender
Здесь огромное внимание следует обратить на producers и consumers. Если очень коротко и просто: producer — это то, что отправляет сообщения через RabbitMQ в consumer, а consumer в свою очередь — та вещь, которая получает и обрабатывает эти сообщения. Здесь же exchange_options — опции для обменника (вы же прочитали статьи про rabbitmq, которые были в начале статьи?), queue_options — опции для очереди (аналогично). Так же стоит обратить внимание на callback в consumer — здесь указывается ID сервиса, который расширяет ConsumerInterface (execute метод с аргументом сообщения).
Т.к. пока что у вас его нету, при запуске приложения или компиляции контейнера мы получим какое-то DI исключение, что сервис не найден, но мы его запрашиваем. Поэтому давайте создавать наш сервис:
#app/config/services.yml
services:
app.consumer.mail_sender:
class: AppBundle\Consumer\MailSenderConsumer
И сам класс:
namespace AppBundle\Consumer;
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use PhpAmqpLib\Message\AMQPMessage;
/**
* Class NotificationConsumer
*/
class MailSenderConsumer implements ConsumerInterface
{
/**
* @var AMQPMessage $msg
* @return void
*/
public function execute(AMQPMessage $msg)
{
echo 'Ну тут типа сообщение пытаюсь отправить: '.$msg->getBody().PHP_EOL;
echo 'Отправлено успешно!...';
}
}
Ну вы же не обиделись, что я не включил в статью как работать со SwiftMailer? :) Нам важно, чтобы сюда асинхронно доставлялась строка через очередь сообщений, то, как мы будем обрабатывать эту строку — дело наше. Почта — всего лишь пример кейса.
Как же нам передать строку в наш консьюмер? Для этого давайте создадим тестовую команду:
namespace AppBundle\Command;
use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
class TestConsumerCommand extends ContainerAwareCommand
{
/**
* {@inheritdoc}
*/
protected function configure()
{
$this
->setName('app:test-consumer')
->setDescription('Hello PhpStorm');
}
/**
* {@inheritdoc}
*/
protected function execute(InputInterface $input, OutputInterface $output)
{
$this->getContainer()->get('old_sound_rabbit_mq.send_email_producer')->publish('Сообщенька для отправки на мыло...');
}
Снова извиняюсь, что не сделал для вас контроллер с красивой формочкой — я слишком ленив для этого. Да и слишком уж излишне. А я экономный лентяй и люблю рисовать, мечтаю немного в сторону теорий и архитектуры приложений. Отвлеклись.
Теперь запускаем наш consumer и приказываем ему ждать сообщения из RabbitMQ:
bin/console rabbitmq:consumer send_email -vvv
И отправим ему сообщение из нашей тестовой команды:
bin/console app:test-consumer
И вот сейчас, в процессе rabbitmq:consumer, мы можем увидеть наше сообщение! И что псевдо отправка завершилась успехом.
А теперь давайте посмотрим, как можно реализовать отложенную обработку сообщений в случае ошибок. Я не буду использовать плагин RabbitMQ для отложенных сообщений. Мы будем достигать этого путем создания новой очереди, в которой укажем время жизни сообщений 30сек и установим настройку: после смерти — перекладываться в основную очередь.
Достаточно лишь добавить новый producer:
producers:
send_email:
connection: default
exchange_options: { name: 'notification.v1.send_email', type: direct }
delayed_send_email:
connection: default
exchange_options:
name: 'notification.v1.send_email_delayed_30000'
type: direct
queue_options:
name: 'notification.v1.send_email_delayed_30000'
arguments:
x-message-ttl: ['I', 30000]
x-dead-letter-exchange: ['S', 'notification.v1.send_email']
Теперь давайте изменим логику консумера:
namespace AppBundle\Consumer;
use OldSound\RabbitMqBundle\RabbitMq\ConsumerInterface;
use OldSound\RabbitMqBundle\RabbitMq\ProducerInterface;
use PhpAmqpLib\Message\AMQPMessage;
/**
* Class NotificationConsumer
*/
class MailSenderConsumer implements ConsumerInterface
{
private $delayedProducer;
/**
* MailSenderConsumer constructor.
* @param ProducerInterface $delayedProducer
*/
public function __construct(ProducerInterface $delayedProducer)
{
$this->delayedProducer = $delayedProducer;
}
/**
* @var AMQPMessage $msg
* @return void
*/
public function execute(AMQPMessage $msg)
{
$body = $msg->getBody();
echo 'Ну тут типа сообщение отправляю '.$body.' ...'.PHP_EOL;
try {
if ($body == 'bad') {
throw new \Exception();
}
echo 'Успешно отправлено...'.PHP_EOL;
} catch (\Exception $exception) {
echo 'ERROR'.PHP_EOL;
$this->delayedProducer->publish($body);
}
}
}
А вообще для вывода полезно использовать LoggerInterface — и красиво и масштабируется.
Но нам же лень и мы не хотим создавать дополнительные «думки», верно? Просто знайте.
Теперь мы должны прокинуть producer для отложенной очереди:
#app/config/services.yml
services:
app.consumer.mail_sender:
class: AppBundle\Consumer\MailSenderConsumer
arguments: ['@old_sound_rabbit_mq.delayed_send_email_producer']
И изменим команду:
namespace AppBundle\Command;
use Symfony\Bundle\FrameworkBundle\Command\ContainerAwareCommand;
use Symfony\Component\Console\Input\InputInterface;
use Symfony\Component\Console\Output\OutputInterface;
class TestConsumerCommand extends ContainerAwareCommand
{
/**
* {@inheritdoc}
*/
protected function configure()
{
$this
->setName('app:test-consumer')
->setDescription('Hello PhpStorm');
}
/**
* {@inheritdoc}
*/
protected function execute(InputInterface $input, OutputInterface $output)
{
$this->getContainer()->get('old_sound_rabbit_mq.send_email_producer')->publish('Ура, сообщенька...');
$this->getContainer()->get('old_sound_rabbit_mq.send_email_producer')->publish('bad');
}
}
Теперь вместе с нормальным сообщением она будет отправлять и плохое сообщение.
Если мы запустим, то увидим следующий вывод:
Ну тут типа сообщение отправляю Ура, сообщенька...
Успешно отправлено...
Ну тут типа сообщение отправляю bad...
ERROR
Спустя 30 секунд еще раз появится сообщение об обработке:
Ну тут типа сообщение отправляю bad...
ERROR
И так бесконечно. Логику максимальных попыток и т.п. продумывайте сами. Далее я дам пару советов для вашего прода и некоторых фич.
Теперь советы для вашего прода:
1) Не отходя от темы с максимальными попытками обработки: знайте на все 102% все возможные исключения контекста с которым вы работаете! Умейте представлять, когда повторная обработка требуется, а когда нет, иначе — привет мусорке из логов и отсутствия понимания что происходит. В случае, если битая задача будет крутится в RabbitMQ, с реальными данными, нормальными задачами, вы вряд ли сможете выкинуть сломанные задачи без костылей, не обновляя код консьюмера и не перезапуская его. Поэтому продумывайте это сразу. В данном случае правильным было бы ловить только лишь SMTPTimeOutException какой-нибудь.
Так же с такой моделью важно понимать, что: на 1 очередь — одна «глобальная ответственность смены состояния чего либо». Не стоит давать слишком много рискованных задач своему воркеру. Если рассмотреть вариант с 1С, то проблема может быть в следующем: допустим при успешном или неуспешном изменении\добавлении товара в 1С мы записываем в базу данных что-нибудь, например, дату последней удачной синхронизации или неудачной. Т.е. тут обновляются сразу 2 базы данных: бд 1С и бд вашего приложения. Допустим в 1С все успешно создалось, далее идет обновление в базе данных поля «дата последней удачной синхронизации» — хоп, вылезла ошибочка, опять же, сервер бд не отвечает — задача откладывается на «потом» и повторяется, пока база данных не начнет отвечать. И при этом каждый раз «подзадача» связанная с созданием сущности в 1С будет успешно выполняться, каждый раз при неудачной попытке записи в базу данных сайта, что неправильно.
2) Прочитайте про durable, раз уж мы с вами используем RabbitMQ. P.S: это заводится как true\false флаг «durable» в конфиге бандла, конкретно — в exchange_options и queue_options
3) Всю свою жизнь закрывайте соединение к базе данных после выполнения работы программы. А так же запускайте очистку EM и после сборщик мусора для чистки ссылок. Т.е. в конце концов наш консьюмер должен выглядеть как-то так:
class MailSenderConsumer implements ConsumerInterface
{
private $delayedProducer;
private $entityManager;
/**
* MailSenderConsumer constructor.
* @param ProducerInterface $delayedProducer
* @param EntityManagerInterface $entityManager
*/
public function __construct(ProducerInterface $delayedProducer, EntityManagerInterface $entityManager)
{
$this->delayedProducer = $delayedProducer;
$this->entityManager = $entityManager;
gc_enable();
}
/**
* @var AMQPMessage $msg
* @return void
*/
public function execute(AMQPMessage $msg)
{
$body = $msg->getBody();
echo 'Ну тут типа сообщение отправляю '.$body.' ...'.PHP_EOL;
try {
if ($body == 'bad') {
throw new \Exception();
}
echo 'Успешно отправлено...'.PHP_EOL;
} catch (\Exception $exception) {
echo 'ERROR'.PHP_EOL;
$this->delayedProducer->publish($body);
}
$this->entityManager->clear();
$this->entityManager->getConnection()->close();
gc_collect_cycles();
}
}
Консьюмер работает как демон, поэтому постоянно копить в нем ссылки и держать соединение с бд — это плохо. В случае с MySQL вы получите MySQL server has gone away.
4) Много думайте, почему ваша модель отложенных сообщений может неожиданно убить ваш бизнес. Например у нас есть механизм, который при изменении товара в админке заливает эти изменения через очередь в 1С. Теперь представим ситуацию: администратор меняет товар -> создается задача #1 на попытку изменить те же данные в базе 1С. Сервер 1С не отвечает, поэтому задачка просто перекладывается постоянно, пока все не заработает. За это время администратор решил еще кое-что подправить в том же товаре, что он и делает. Регистрируется задача #2.
А теперь представьте ситуацию, когда поочередно выполняются и откладываются задачи #1 и #2.
Что если 1С заработает к моменту выполнения задачи #2? Задача выполнится и зальёт последние изменения. Далее пойдет в ход задача #1 и затрет собой стабильные изменения :)
Выход: отправляем timestamp в качестве version, и, если задача «из прошлого» — выкидываем её.
5) Идешь в асинхронность — прочитай про многие архитектурные проблемы, а также race condition, несогласованность консумеров на разных машинах и прочее.
6) Пишите версии вашим очередям… Ух как помогает на реальном проде. В принципе мы так и сделали в этом примере.
7) Возможно тебе не нужен RabbitMQ и целый AMQP протокол. Посмотри в сторону beanstalkd.
8) Запускайте консумеры и всякое прочее демоническое на php через supervisor и подключите полное логирование падения процессов в нём. У него так же есть web интерфейс для управления всем этим делом, что так же очень удобно. Проблемы будут всегда.
Комментарии (13)
VolCh
29.09.2017 13:51Разве rabbitmq:consumer работает в режиме демона? Мне казалось, что обычное CLI приложение.
Fesor
29.09.2017 14:25Ну в целом все упирается в то что считать "режимом демона". Эта команда — обычное CLI приложение, которое не умирает после первого сообщения и умеет ловить/обрабатывать сигналы (пока только stop, в restart написан TODO: implement). Далее вопрос как запускать.
igorCD
29.09.2017 15:23+1Позвольте дополнить самую последнюю фразу «Проблемы будут всегда.» данной статьи.
О некоторых багах phpamqp-lib. Баг репорты есть на гитхабе, до сих пор нету решения.
1. Consumer: допустим запустили мы consumer и активировали heartbeat. От тихо ожидает новых сообщений в этой точке github.com/php-amqplib/RabbitMqBundle/blob/master/RabbitMq/BaseConsumer.php#L53. Бесконечно долго, т.к. timeout = 0. Допустим, падает сетевое соединение на какое-то время, затем оно восстанавливается. Консьюмер по прежднему ждёт сообщения в той точке, а RabbitMQ уже закрыл соединение. Всё, консьюмер будет висеть в памяти до тех пор, пока его не прибьют сигналом каким-нибудь. Соответсвенно, если в консьюмере есть таймер для «самоубийства», то он не сработает.
2. Без heartbeat могут быть проблемы с незакрытыми соединениями на стороне RabbitMQ. Возможно тут ещё всему виной HAProxy, который в нашем случае стоит перед кластером RabbitMQ.
Из-за этих казалось бы незначительных проблем, код надёжного консьюмера выглядит как бесконечный try/catch и while loops, чтобы консьюмер мог реагировать на разрыв соединения и переподключаться. Такого функционала нету ни в какой высокоуровневой библиотеке (см. Enqueue, например, или бандл, описанный в данной статье)
Далее, есть очень хорошая инициатива — github.com/queue-interop/queue-interop — при помощи данных интерфейсов можно быть независимым от конкретной библиотеки, которая реализует AMQP протокол (phpamqp-lib, php extension, bunny lib). Интерфейсы позаимстованны из мира Java. Но они тоже на начальном этапе своего существования.
Например, в нашем проекты мы решили стать менее зависимыми от конкретной AMQP библиотеки. Посему перевели всех наших consumer/producer на github.com/php-enqueue/enqueue-dev. Проект не на Symfony, поэтому используем только имплементацию Queue Interop интерфейсов с использованием phpamqp-lib, чтобы в ближайшем будущем попробовать имплементацию от bunny lib.
Но тут ждало разочарование: такая фича, как Publisher Confirms, которая есть только в RabbitMQ, отсутсвует out-of-the-box в самих интерфейсах от Queue Interop (тикет на гитхабе есть). Пришлось делать костыли, очень нехорошие костыли. В проекте у нас договорённость о at-least-once delivery, потому и нужен Publisher Confirms.
Если всё подытожить, то вот мои рекомендации:
1. Подумайте 10 раз, нужин ли вам Message Broker вообще. Если можно, используйте MySQL/файлы/что угодно, что уже есть в вашем проекте. Тех поддержка самого RabbitMQ в продакшене — тот ещё геморой для Operations Team.
2. Если всё же решились на Message Broker, то ответьте на вопрос, можете ли вы потерять пару сообщений? Если можете — то используйте любые бандлы/библиотеки. В противном случае крайне рекомендую сначала написать пару consumer/producer на низком уровне (к примеру, используя phpamqp-lib без обёрток) и хорошенько пройдитесь отладчиком, разрывая соединение/физически отключая/аварийно выключая RabbitMQ. Вы удивитесь, как легко можно потерять сообщения. А уже потом подбирайте конкретные высокоуровневые бандлы под свои нужды.maksim_ka2
01.10.2017 12:08Но тут ждало разочарование: такая фича, как Publisher Confirms, которая есть только в RabbitMQ, отсутсвует out-of-the-box в самих интерфейсах от Queue Interop (тикет на гитхабе есть). Пришлось делать костыли, очень нехорошие костыли. В проекте у нас договорённость о at-least-once delivery, потому и нужен Publisher Confirms.
Я думаю что Publisher Confirms не про at-least-once delivery. Это расширение дает возможность отправителю получать уведомления когда его сообщения обработаны получателем.
В документации ничего не сказано про случай когда соединение с получателем обрывается (по причине исключения например) и брокер возвращает сообщение в очередь с пометкой redelivered. Есть шанс обработать сообщение больше чем один раз. Уверен есть и другие ситуации.
Если бы мне нужно было «at-least-once delivery» я бы делал это на стороне получателя + хранилище для проверки на повтор.
От отправителя требуется проставить уникальный идентификатор сообщению.
maksim_ka2
29.09.2017 16:28+2Рекомендую посмотреть в сторону EnqueueBundle. Более мощное решение как по части использования так и по колличесту фишек для программистов. Кто много работал с RabbitMqBundle — меня поймет. Код получается проще и лаконичнее. Сравнение можно посмотреть тут. Работает не только с php-amqplib но и bunny, amqp-ext (спасибо amqp interop).
Практически все что вы описываете, в Enqueue уже реализовано из коробки: например, такие веши как, переподключение к базе данных, чистка entity manager, повторы с задержкой, RPC.
EnqueueBundle умеет это делать из коробки.
коммуникацию между компонентами через AMQP, как красиво делать RPC, то я сам чего-то подобного очень давно жду на хабре…
Вот Message bus to every PHP applicationBig_Shark
30.09.2017 18:17А почему не Bernard? Как мне кажется он лучше поддерживается.
maksim_ka2
30.09.2017 20:45
Fesor
01.10.2017 21:01Пользуюсь бернардом уже год, новые проекты переводим на enqueue. Он удобнее и больше полезных фич из коробки.
Big_Shark
01.10.2017 21:13У меня бернард на 3 проектах, когда выбирал что ставить то больше внимания обратил на количество звезд на гитхабе. Но в будущем попробую enqueue. Спасибо.
Fesor
01.10.2017 21:17обратил на количество звезд на гитхабе.
я то же на это купился, плюсь понравилось то о чем общаются люди в ишус трекере. Но вот прошел год и воз и поныне там. 1.0.0 все еще в альфе.
Fesor
Вот тут по подробнее, зачем? Во первых просто для ссылок php и так почистит тогда когда надо, в прочем как и для циклических ссылок. Раскройте цель подобных манипуляций со сборщиком подробнее. Есть мнение что это как-то связано с
как я уже говорил — php сам удалит ссылки. Более того вызов
gc_collect_cycle
вообще ничего не сделает особо.Зато другая проблема появляется — reset менеджера при исключениях. И вот там вполне коннекшен умирает и его надо передернуть. А закрывать коннекшен между задачами в очереди — крайне неразумно. Скажем для postgresql реконнект штука относительно дорогая, да и зачем нам этот оверхэд по времени обработки сообщений..
dizzy7
gc_collect_cycle бесполезен, подтверждаю. А вот вызов gc_mem_caches стоит делать — иначе php не отдаёт память системе — это может быть критично если размер заданий плавает и на некоторых потребление памяти сильно больше среднего.
Fesor
Вот тут я уже не знаю точно, но насколько я помню менеджер памяти не просто так не высвобождает запрошенные у системы блоки, а потому что планирует реюзать их далее. То есть если приложению вдруг понадобилось 200 метров, то почему бы ему еще раз не понадобилось?
Ну то есть я к тому что часто ее вызывать наверное тоже не стоит как минимум потому что иначе это будет вынуждать пых чаще просить систему выделять память. Но периодически, скажем раз в 100 сообщений или в зависимости от потребления памяти — пожалуй стоит.