Привет! Меня зовут Павел и я Magento 2 бэкенд-разработчик. Когда-то давно, когда я только начинал знакомство с Magento 2 (для краткости буду называть ее M2), мне понадобилось автоматизировать обработку однотипных событий при разработке одного решения. Тогда я удивился, насколько мало информации на русском языке об интеграции очередей в M2. Время идет, а ситуация не меняется: информации об этом на просторах рунета все так же мало. Раскроем эту тему. Для начала кратко поговорим про очереди: что это такое и зачем они нужны, потом рассмотрим интеграцию M2 с популярным менеджером очередей Rabbit MQ (далее по тексту — RMQ), а также напишем простую реализацию работы с очередями в качестве примера. Погнали!

Менеджеры очередей

Менеджеры очередей обеспечивают асинхронную процедуру обмена данными между элементами одной системы или разными системами. Для хранения сообщений используются очереди. Менеджер через обменник (exchange) принимает сообщение (некие данные) от издателя (publisher) и помещает в именованную очередь (queue), откуда их может запросить потребитель (consumer) и определенным образом обработать.

Использование менеджеров очередей обладает рядом преимуществ, вот главные из них:

  • Слабая связанность издателя и потребителя — данные подсистемы работают асинхронно, используя менеджер очередей в качестве посредника, что обеспечивает возможность независимой модификации систем при условии неизменности интерфейсов;

  • Экономия и балансировка потребления ресурсов — использование очередей позволяет избежать дублирования сообщений, а также обеспечивает возможность управления потреблением вычислительных ресурсов, затрачиваемых на обработку сообщений;

  • Гарантия сохранения данных — менеджер очередей обеспечивает сохранность данных в случае непредвиденного отказа издателя или потребителя;

  • Асинхронная обработка данных позволяет публиковать и обрабатывать сообщения независимо, в разных потоках, системах и в разное время.

Это далеко не полный список преимуществ, однако достаточный для понимания удобства использования очередей, а также случаев, когда такое использование уместно.  

«Из коробки» М2 умеет интегрироваться с RMQ, чего достаточно для подавляющего большинства сценариев использования очередей. На хабре есть отличный сериал про RMQ от пользователя @artemmatveev, рекомендую ознакомиться для углубленного понимания того, как это работает (ну и, конечно же, официальную документацию никто не отменял). В рамках данной статьи нам будет достаточно базовых сведений. 

Интеграция RMQ в M2

Условимся, что RMQ у нас установлен, настроен и работает. Если нет, то нужно обратиться к первоисточнику, после чего вернуться к данной статье. Наиболее частый сценарий использования — RMQ установлен на том же сервере, что и M2. Но это не обязательно, поскольку RMQ является полностью независимым, и для подключения к нему нам нужно знать хост, порт и реквизиты доступа (об этом чуть ниже).

Итак, RMQ работает, теперь нужно подружить M2 с ним. Для этого идем в <magento_root>/app/etc/env.php и добавляем такой раздел:

'queue' => [
    'amqp' => [
        'host' => 'localhost',
        'port' => '5672', //Порт по умолчанию
        'user' => 'guest', //Логин по умолчанию
        'password' => 'guest', //Пароль по умолчанию
        'virtualhost' => '/',
        //Если мы хотим использовать SSL:
        'ssl' => 'true',
        'ssl_options' => [
            'cafile' => '/etc/pki/tls/certs/DigiCertCA.crt',
            'certfile' => '/path/to/magento/app/etc/ssl/test-rabbit.crt',
            'keyfile' => '/path/to/magento/app/etc/ssl/test-rabbit.key'
        ],
    ],
]

Выполняем bin/magento setup:upgrade для достижения эффекта. Если все компоненты работают корректно, то работа завершена: теперь M2 умеет общаться с RMQ.

Процедуру настройки также можно выполнить сразу при установке M2. Для этого в команду bin/magento setup:install нужно добавить необходимые параметры, например:

--amqp-host="<hostname>" --amqp-port="5672" --amqp-user="<user_name>" --amqp-password="<password>" --amqp-virtualhost="/"

Настоятельно рекомендую установить плагин для RMQ, который называется Management, это позволит через браузер настраивать конфигурацию RMQ, просматривать очереди и делать много всего полезного (в том числе убедиться, что все работает как надо).

В очередь!

Обожаю совмещать теорию с практикой, поэтому сейчас мы реализуем процесс с поддержкой очередей в М2.

Создадим стандартный модуль М2. Условимся, что у нас уже есть реализация объекта (модель, ресурс-модель и т.д.), которую назовем по традиции WhiteRabbit.

Теперь нам нужно объявить несколько сущностей, для этого создадим несколько xml файлов в папке etc нашего модуля. 

communication.xml — задает общую конфигурацию для взаимодействия модуля с очередью сообщений.

<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:noNamespaceSchemaLocation="urn:magento:framework:Communication/etc/communication.xsd">
    <topic name="white.rabbit" request="RSHB\WhiteRabbit\Api\Data\WhiteRabbitInterface" />
    <!-- поле request указывает на интерфейс, который реализован 
    моделью WhiteRabbit. Для целей данной статьи конфигурация модели 
    не важна, просто условимся, что запрос содержит некий несложный 
    объект -->
</config>

Создаем топик с именем white.rabbit и задаем интерфейс, реализация которого описывает объект, содержащийся в запросе. 

Полное описание XML-схемы этого файла можно найти на странице документации.

queue_consumer.xml — описываем потребителя, назначаем ему очередь, которую он будет слушать, а также класс и метод, которые будут обрабатывать сообщения, полученные из очереди.

<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/consumer.xsd">
    <consumer name="WhiteRabbitConsumer" queue="white_rabbit" connection="amqp"
              handler="RSHB\WhiteRabbit\Model\WhiteRabbit\WhiteRabbitConsumer::processMessage"/>
</config>

Полное описание XML-схемы этого файла можно найти на странице документации.

queue_topology.xml — объявляем обменники, роутинг и очереди.

<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/topology.xsd">
    <exchange name="rshb.white.rabbit" type="topic" connection="amqp">
        <binding id="RshbWhiteRabbit" topic="white.rabbit" destinationType="queue" destination="white_rabbit"/>
    </exchange>
</config>

Здесь мы создаем обменник с именем rshb.white.rabbit, а также привязываем к нему топик white.rabbit и очередь white_rabbit в качестве пункта назначения.

Полное описание XML-схемы этого файла можно найти на странице документации.

queue_publisher.xml — объявляем издателя, привязываем его к топику и указываем обменник, который будет им использоваться.

<config xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
        xsi:noNamespaceSchemaLocation="urn:magento:framework-message-queue:etc/publisher.xsd">
    <publisher topic="white.rabbit">
        <connection name="amqp" exchange="rshb.white.rabbit" disabled="false"/>
    </publisher>
</config>

Полное описание XML-схемы этого файла можно найти на странице документации.

Пришла очередь написать объявленные классы. Описываем потребителя:

<?php

namespace RSHB\WhiteRabbit\Model\WhiteRabbit;

use RSHB\WhiteRabbit\Api\Data\WhiteRabbitInterface;

/**
 * Class WhiteRabbitConsumer
 *
 * @package RSHB\WhiteRabbit\Model\WhiteRabbit
 */
class WhiteRabbitConsumer
{
    /**
     * Process message from queue
     *
     * @param WhiteRabbitInterface $whiteRabbit
     */
    public function processMessage(WhiteRabbitInterface $whiteRabbit)
    {
        //Получаем из очереди объект WhiteRabbit и используем его 
    }
}

Класс издателя:

<?php

namespace RSHB\WhiteRabbit\Model\WhiteRabbit;

use Magento\Framework\MessageQueue\PublisherInterface;
use RSHB\WhiteRabbit\Api\Data\WhiteRabbitInterface;

/**
 * Class WhiteRabbitPublisher
 *
 * @package RSHB\WhiteRabbit\Model\WhiteRabbit
 */
class WhiteRabbitPublisher
{
    /**
     * RabbitMQ Topic name
     */
    const TOPIC_NAME = 'white.rabbit';

    /**
     * @var PublisherInterface
     */
    private $publisher;

    /**
     * WhiteRabbitPublisher constructor
     *
     * @param PublisherInterface $publisher
     */
    public function __construct(
        PublisherInterface $publisher
    ) {
        $this->publisher = $publisher;
    }

    /**
     * Add message to queue
     *
     * @param WhiteRabbitInterface $whiteRabbit
     */
    public function execute(WhiteRabbitInterface $whiteRabbit)
    {
        $this->publisher->publish(self::TOPIC_NAME, $whiteRabbit);
    }
}

На этом наша реализация готова. Попробуем что нибудь положить в очередь:

<?php

namespace RSHB\WhiteRabbit\Model;

use RSHB\WhiteRabbit\Api\Data\WhiteRabbitInterface;
use RSHB\WhiteRabbit\Model\WhiteRabbit\WhiteRabbitPublisher;

/**
 * Class WhiteRabbitManagement
 * @package RSHB\WhiteRabbit\Model
 */
class WhiteRabbitManagement
{
    /**
     * @var WhiteRabbitPublisher
     */
    private $publisher;

    /**
     * WhiteRabbitManagement constructor
     *
     * @param WhiteRabbitPublisher $publisher
     */
    public function __construct(
        WhiteRabbitPublisher $publisher
    ) {
        $this->publisher = $publisher;
    }

    /**
     * Send White Rabbit to Queue
     *
     * @param WhiteRabbitInterface $whiteRabbit
     */
    public function sendWhiteRabbit(WhiteRabbitInterface $whiteRabbit)
    {
        //Вызываем издателя и передаем ему объект White Rabbit
        $this->publisher->execute($whiteRabbit);
    }
}

Посмотрим на нашу очередь через плагин Management. Для этого зайдем на страницу http://<yourhost>:15672 и введем логин и пароль (по умолчанию - guest:guest) для авторизации. На вкладке Queues выбираем нашу очередь white_rabbit и видим в ней объект, который мы положили туда на прошлом шаге:

На данном этапе потребитель должен получить сообщение из очереди и каким-то образом его обработать (метод processMessage в классе потребителя). По умолчанию cron-задача запускает потребителя 1 раз в секунду (потребителю можно задать произвольный период опроса очереди путем указания атрибута sleep элемента consumer в файле queue_consumer.xml).

На этом наша демонстрационная реализация очередей в M2 закончена.

Выводы

Интеграция в RMQ и M2 легко настраивается, но, вместе с тем, обладает отличной гибкостью и позволяет использовать все штатные возможности RMQ, причем как с размещением RMQ на том же сервере, так и на отдельном. Использование RMQ в связке с M2 заметно повышает и так незаурядные способности M2  к расширению.

На этом все. Пишите код с удовольствием и используйте правильные решения. До встречи!