В предыдущей статье мы написали простую систему логгирования. В которой мы отсылали сообщения сразу нескольким получателям. В этой статье мы добавим в неё новую функцию, которая позволит получателям получать только определённое подмножество сообщений. Благодаря этому мы сможем, например, отправлять только критические сообщения в лог-файл, в то же время выводя все отосланные сообщения в окно терминала.

Связи между очередью и обменником


В предыдущей статье мы научились связывать очередь с обменником. Примерно таким кодом:

$producer->queue('queue-1')->bind('logs');

Кроме этого, метод связывания может принимать вторым аргументом ключ маршрутизации (routing key). Чтобы не путать его с ключом маршрутизации, который принимает метод Producer::publish() в качестве второго параметра, давайте будем называть его ключом связывания (binding key). Вот как мы можем связать обменник с очередью использую ключ связывания:

$producer->queue('queue-2')->bind('logs', 'binding-key');

Значение ключа связывания зависит от типа обменника. Fanout обменник из предыдущего урока просто игнорирует это значение.

Direct обменник


Наша система логгирования из прошлой статьи отсылает все сообщения всем получателям.

Давайте слегка изменим её, чтобы она фильтровала сообщения в зависимости от уровня строгости. К примеру, чтобы она писала в лог-файл только critical сообщения, и не тратила место на диске на warning и info сообщения.

Fanout обменник не даёт нам такой возможности.

Но мы можем использовать direct обменник, который отсылает сообщения только тем очередям, чей ключ связывания совпадает с ключом маршрутизации отправленного сообщения (второй аргумент метода publish()). Этот механизм действия иллюстрирует следующая картинка:

image
(изображение взято с официального сайта RabbitMQ)

На этой картинке изображён обменник X типа direct и две очереди — одна с ключом связывания orange и вторая с двумя ключами связывания black и green.

В этой установке сообщения с ключом маршрутизации orange будут отправлены в первую очередь, а сообщения с ключами маршрутизации black или green будут отправлены во вторую. Любые другие сообщения будут отбрасываться.

Множественное связывание


image
(изображение взято с официального сайта RabbitMQ)

Вполне допустимо связывать обменник с несколькими очередями одним ключом связывания. В нашем примере мы можем добавить связь между обменником X и очередью Q1 использую ключ связывания black. В этом случаем наш обменник типа direct будет вести себя как обменник типа fanout, он будет отправлять сообщения во все очереди с совпавшим ключом связывания. Сообщение с ключом маршрутизации black будет доставлено и в очередь Q1 и в очередь Q2.

Выпуск сообщений


Мы будем использовать выше описанную модель для нашей системы логгирования. Вместо отправки сообщений в fanout обменник, мы будем отсылать их в direct обменник. Мы будем указывать уровень строгости сообщения как ключ маршрутизации. Таким образом скрипт-получатель будет способен выбирать сообщения нужного ему уровня строгости. Но сперва давайте сосредоточимся на выпуске сообщений. Как всегда для этого нам сначала понадобится создать обменник.


$producer->newDirectExchange('my-logs');

Теперь мы можем отправлять сообщения:


$producer->newDirectExchange('my-logs');
$producer->publish($message, $severity, 'my-logs');

Чтобы не усложнять вещи давайте предполагать что уровень строгости ($severity) может быть только 'info', 'warning', 'error'.

Подписка


Как и в предыдущей части мы создадим двух воркеров, которые будут объявлять очереди и связывать их с обменником следующим образом:


// Worker 1
foreach ($severities as $severity) {
   $consumer->queue('queue-1')->bind('my-logs', $severity);
}

Соединяем всё вместе


image
(изображение взято с официального сайта RabbitMQ)

Код send.php:


try {
   $producer = \MonsterMQ\Client\Producer();

   $producer->connect('127.0.0.1', 5672);
   $producer->logIn('guest', 'guest');

   $producer->newDirectExchange('my-logs');

   $severity = isset($argv[1]) && !empty($argv[1]) ? $argv[1] : 'info';

   $message = implode(' ', array_slice($argv, 2));
   $message = empty($message) ? "Hello World!" : $message;

   $producer->publish($message, $severity, 'my-logs');

   echo "\n Sent {$message} \n";
} catch(\Exception $e) {
   var_dump($e);
}

Код worker-1.php:


try {
   $consumer = \MonsterMQ\Client\Consumer();

   $consumer->connect('127.0.0.1', 5672);
   $consumer->logIn('guest', 'guest');

   $producer->queue('queue-1')->setExclusive()->declare();

   $severities = array_slice($argv, 1);
   if (empty($severities)) {
      file_put_contents('php://stderr', "Usage: $argv[0] [info] [warning] [error]\n");
      exit(1);
   }

   foreach ($severities as $severity) {
      $producer->queue('queue-1')->bind('my-logs', $severity);
   }

   $consumer->consume('queue-1');

  echo " \n Waiting for logs. To exit press CTRL+C\n";

   $consumer->wait(function ($message, $channelNumber) use ($consumer){
      echo "\n $message \n";
   });
} catch(\Exception $e) {
   var_dump($e);
}

Код worker-2.php будет отличаться от worker-1.php только названием очереди, связываемой с обменником и передаваемой методу consume(). Вместо 'queue-1' укажите 'queue-2'.

Теперь, если вы хотите чтобы первый воркер сохранил сообщения уровня 'error' и 'warning' в файл, выполните следующую команду в терминале:

php worker-1.php warning error > logs_from_rabbit.log

А если хотите чтобы второй воркер выводил все сообщения в окно терминала выполните следующее:

php worker-2.php info warning error
# => Waiting for logs. To exit press CTRL+C

И для того чтобы выпустить error сообщение напишите следующее:

php send.php error "Run. Run. Or it will explode."

На этом всё. В следующей части мы научимся получать сообщения основываясь на определённом паттерне.