image
(Изображение взято с официального сайта RabbitMQ)

В первой статье мы написали две программы на PHP, которые используют RabbitMQ: одна посылала сообщения, вторая принимала. В этой статье мы разберём как создать очередь, которая будет распределять задачи, затрачивающие значительное количество времени, среди многих воркеров (обработчиков сообщений).

Основная идея состоит в том чтобы не выполнять, затрачивающие значительное количество времени задачи сразу, а добавлять их в очередь, в виде сообщения. Позже работающий воркер получает сообщение из очереди, которую он потребляет и выполняет задачу как бы «в фоне».
Эта концепция очень актуальна для web-приложений, где, например, может требоваться выполнение каких-нибудь затратных по времени задач, результат которых не требуется сразу, такими задачами могут быть например отправка email или выполнение HTTP-запроса к стороннему приложению (например через libcurl на PHP).

Приготовление


В этой части мы будем отправлять сообщения, которые будут представлять задачи для воркеров. Поскольку у нас нет реальных задач, вроде обработки изображений или генерирования pdf-файлов мы притворимся что мы заняты использую функцию sleep(). Мы будем принимать число точек в сообщении за сложность задачи. Каждая точка будет означать одну секунду работы, например сообщение Hello... будет занимать три секунды работы.

Мы слегка изменим наш скрипт send.php из прошлого урока, чтобы он мог отправлять произвольные сообщения.

try {
   $producer = \MonsterMQ\Client\Producer();

   $producer->connect('127.0.0.1', 5672);
   $producer->logIn('guest', 'guest');

   $producer->queue('test-queue')->declare();

   $message = implode(' ', array_slice($argv, 1));
   $message = empty($message) ? 'Hello world!' : $message;

   $producer->publish($message, 'test-queue');

   echo "\n Sent {$message} \n";
} catch(\Exception $e) {
   var_dump($e);
}

Наш скрипт receive.php также требует изменений. Он должен имитировать работу в течении одной секунды для каждой точки в принятом сообщении. Давайте переименуем файл в worker.php и напишем в нём следующий код:

try {
   $consumer = \MonsterMQ\Client\Consumer();

   $consumer->connect('127.0.0.1', 5672);
   $consumer->logIn('guest', 'guest');

   $consumer->queue('test-queue')->declare();

   $consumer->consume('test-queue');

   $consumer->wait(function ($message, $channelNumber) use ($consumer){
      echo "\n Received: {$message}";
      sleep(substr_count($message, '.'));
      echo "\n Done";
   });
} catch(\Exception $e) {
   var_dump($e);
}

Далее запустим оба скрипта в разных терминалах:

# Shell 1
php worker.php 

# Shell 2
php send.php "A very hard task which takes two seconds.."

Одним из преимуществ использования очередей задач является возможность распределения работы среди многих воркеров. Давайте попробуем запустить сразу два скрипта worker.php в двух разных терминалах. А в третьем будем посылать сообщения скриптом send.php

# Shell 1
php worker.php 

# Shell 2
php worker.php 

# Shell 3
php new_task.php First message.
php new_task.php Second message..
php new_task.php Third message...
php new_task.php Fourth message....
php new_task.php Fifth message.....

Далее посмотрим что вывели наши воркеры:
# shell 1
php worker.php
# Received: First message.
# Done
# Received: Third message...
# Done
# Received: Fifth message.....
# Done

# shell 2
php worker.php
# Received: Second message..
# Done
# Received: Fourth message....
# Done

По-умолчанию RabbitMQ будет отправлять каждое следующее сообщение следующему потребляющему получателю по очереди. В среднем каждый получатель получит одинаковое число сообщений, этот способ распределения сообщений называется round-robin (по-кругу). Попробуйте это с тремя или более воркерами.

Подтверждение сообщений


Выполнение задач может занимать определённое количество времени. Возможно вас интересует что будет с сообщением, если завершить работу получателя, который не успел обработать это сообщение до конца. С нашим текущим кодом сообщение будет возвращено в очередь, так как в MonsterMQ по-умолчанию включены подтверждения сообщений (message acknowledgements).

Когда мы используем подтверждение сообщений — мы говорим RabbitMQ, что сообщение было обработано и он в праве удалить его из очереди. Если получатель завершил свою работу, не отослав подтверждения (например в результате непредвиденного завершения TCP-соединения), RabbitMQ поймёт что сообщение не было обработано и вернёт его обратно в очередь, пытаясь доставить его другим доступным воркерам. Таким образом вы можете быть уверены в том, что сообщения не потеряются, даже в случае непредвиденного завершения работы какого-нибудь из воркеров.

Чтобы отключить подтверждения сообщений в MonsterMQ вы можете передать значение true как второй аргумент методу consume()

$consumer->consume('test-queue', true);

Забывание подтверждения полученных сообщений, довольно распространённая, легко-допустимая ошибка, которая может привести к серьёзным последствиям. Если это произойдёт, сообщения будут доставляться снова и снова и также будут накапливаться в очередях занимая всё больше и больше памяти. Для отладки этой ошибки используйте rabbitmqctl, чтобы вывести поле messages_unacknowledged

sudo rabbitmqctl list_queues name messages_ready messages_unacknowledged

На windows отбросьте sudo

rabbitmqctl.bat list_queues name messages_ready messages_unacknowledged

Сохранность очередей


Несмотря на то, что наши сообщения не будут потеряны, если какой-нибудь из воркеров неожиданно завершит свою работу, тем не менее мы всё ещё можем потерять созданные очереди если завершить работу RabbitMQ.

Чтобы обезопасить себя от потери очередей, нужно объявить очередь как durable (стойкая, долговечная). Поскольку очереди идемпотентны, то есть мы не можем изменить или создать её заново, вызывая метод объявления с тем же именем, нам придётся объявить новую очередь. Сделаем это следующим образом

$consumer->queue('new-queue')->setDurable()->declare();

Не забудьте изменить код объявления очереди и в коде отправителя.

Честное распределение сообщений


Вы могли заметить, что сообщения между двумя нашими воркерами, до сих пор распределяются не совсем честно. Если например каждое чётное сообщение является трудоёмким, а каждое нечётное сообщение обрабатывается быстро, то один воркер получающий трудоёмкие сообщения будет всегда занят, в то время когда второй будет простаивать. Чтобы избежать этого можно использовать quality of service. Давайте добавим следующую строку в наш код

$consumer->qos()->prefetchCount(1)->perConsumer()->apply();

Эта строка говорит RabbitMQ не отправлять сообщения получателю, пока он не обработает и не подтвердит текущее сообщение. perConsumer() применяет quality of service, ко всем каналам получателя, используйте метод perChannel() если хотите применить quality of service только к текущему каналу.

image
(Изображение взято с официального сайта RabbitMQ)

Соберём весь код вместе


Так будет выглядеть наш send.php(отправитель)

try {
   $producer = \MonsterMQ\Client\Producer();

   $producer->connect('127.0.0.1', 5672);
   $producer->logIn('guest', 'guest');

   $consumer->queue('new-queue')->setDurable()->declare();

   $message = implode(' ', array_slice($argv, 1));
   $message = empty($message) ? 'Hello world!' : $message;

   $producer->publish($message, 'test-queue');

   echo "\n Sent {$message} \n";
} catch(\Exception $e) {
   var_dump($e);
}

А так получатель worker.php

try {
   $consumer = \MonsterMQ\Client\Consumer();

   $consumer->connect('127.0.0.1', 5672);
   $consumer->logIn('guest', 'guest');

   $consumer->queue('new-queue')->setDurable()->declare();

   $consumer->qos()->prefetchCount(1)->perConsumer()->apply();

   $consumer->consume('test-queue');

   $consumer->wait(function ($message, $channelNumber) use ($consumer){
      echo "\n Received: {$message}";
      sleep(substr_count($message, '.'));
      echo "\n Done";
   });
} catch(\Exception $e) {
   var_dump($e);
}

На этом всё, в следующем уроке мы научимся отправлять сообщения многим получателям из одной очереди.