Возникла как то потребность использовать асинхронную отправку писем.

Передо мной открылись два основных направления:

  • Наколхозить на скорую руку свою поделку для работы с очередью сообщений.

  • Использовать мощные стабильные инструменты.

Выбор пал на symfony/messenger по нескольким причинам:

  • Во первых, первым путём я уже хаживал.

  • Во вторых, я давно смотрел в сторону Symfony ожидая подходящей идеи для личного проекта, чтобы неспешно войти в его мир.

  • В третьих, он поддерживает несколько различных транспортов.

  • В четвёртых, предоставляет возможность использовать практически готовые к работе воркеры, предоставляемыe компонентом symfony/console.

  • Ну и последнее, впереди были выходные, и я мог себе позволить разобраться как это сделать.

Надо отметить, что официальная документация пролила мало света на то, как использовать компонент symfony/messenger без использования самого фреймворка. Все приводимые примеры рассматривают как сконфигурировать зависимости для symfony контейнера в yaml файлах.

В документации к компоненту есть статья: https://symfony.com/doc/current/components/messenger.html про использование мессенджера в качестве независимого компонента, но про настройку транспорта к сожалению в ней ничего не сказано.

Так же на Хабре есть статья - Перевод PHP бэкенда на шину Redis streams и выбор независимой от фреймворков библиотеки. В ней автор провёл важные изыскания, показал способ настройки транспорта, но по какой то причине отказался использовать symfony/console.

Из комментариев к той статье, я понял, что, существует пробел в понимании этого мощного и важного инструмента.

В статье я опущу описание создания классов сообщения и его обработчика так как в документации этот момент подробно описан.

И так приступим.

Конфигурируем соединение в контейнере используя DSN, для RabbitMQ это может выглядеть так:

AmqpConnection::class => function (ContainerInterface $container) {
    return AmqpConnection::fromDsn('amqp://guest:guest@rabbitmq:5672/%2f/messages', []);
}

для Redis так:

RedisConnection::class => function (ContainerInterface $container) {
    return RedisConnection::fromDsn('redis://redis:6379/messages', [
        'stream' => 'messages',
        'group' => 'default',
        'consumer' => 'default',
    ]);
}

Далее настраиваем рессиверы для обеих соединений, они нам понадобятся в консоли. Обратите внимание, что делаем их именованными, имменно по имени ampq-async или redis-async команда symfony/console messenger:consume ampq-async сможет его обнаружить в контейнере.

'ampq-async' => function (ContainerInterface $container) {
    return new AmqpReceiver(
        $container->get(AmqpConnection::class)
    );
},

'redis-async' => function (ContainerInterface $container) {
    return new RedisReceiver(
        $container->get(RedisConnection::class)
    );
},

Далее описываем в контейнере шину сообщений, передавая создаваемые там же SendMessageMiddleware и HandleMessageMiddleware

'message-bus' => function (ContainerInterface $container) {
    $handler = new EmailMessageHandler($container);
    return new MessageBus([ 
        new SendMessageMiddleware(
            new SendersLocator([ 
                EmailMessage::class => [
                    AmqpTransport::class,
                /*    RedisTransport::class, */
                ]
            ], $container)
        ),
        new HandleMessageMiddleware(
            new HandlersLocator([
                EmailMessage::class => [$handler],
            ])
        )
    ]);
}    

Шину так же делаем именованной, имя шины опять таки потребуется нам в консольной команде messenger:consume, но к этому ещё вернёмся позже, когда будем рассматривать работу консольного приложения.

Таким образом, получаем полностью сконфигурированную шину сообщений. Очень просто, не правда ли?

Вот и всё, теперь в контроллере отправить сообщение на шину можно просто вызвав:

$busName = 'message-bus';
$bus = $this->container->get($busName);
$bus->dispatch(
    new EmailMessage($message), [
        new Symfony\Component\Messenger\Stamp\BusNameStamp($busName) 
    ]
);

Обратите внимание, что помимо сообщения мы передаём в шину штамп с названием шины, без этого штампа нельзя использовать стандартную команду ConsumeMessagesCommand т.к. по имени в этом штампе RoutableMessageBus находит нужную шину в контейнере.

Теперь о потреблении наших сообщений из очереди.

Компонент symfony/console это очень мощный и гибкий инструмент. При использовании его в рамках фреймворка приложение использует Symfony\Bundle\FrameworkBundle\Console\Application, передавая ему конфигурацию фреймворка, таким образом консоль получает возможность использовать все команды всех доступных компонентов фреймворка.

Но нам такой радости не вкусить. В случае использования symfony/console в качестве независимого компонента, приложению придётся в качестве ядра использовать Symfony\Component\Console\Application, а все команды конфигурировать в ручную.

Установим компонент консоли.

composer require symfony/console

После установки документация предлагает создать файл console в папке bin с вот таким содержимым.

#!/usr/bin/env php
<?php
// application.php
require __DIR__.'/vendor/autoload.php';
use Symfony\Component\Console\Application;
$application = new Application();
// ... register commands
$application->run();

Теперь обустроем это файл в соответствии с нашими потребностями.

В первую очередь добавим определение нашего контейнера.

$containerBuilder = new ContainerBuilder();
$containerBuilder->addDefinitions(__DIR__ . '/../config/dependencies.php');
$container = $containerBuilder->build();

Далее нам потребуется объект консольного вывода

$output = new ConsoleOutput();

ну и конечно же определения необходимых команд

$commands = [
    new ConsumeMessagesCommand(
        new RoutableMessageBus($container),
        $container,
        new EventDispatcher(),
        new ConsoleLogger($output, [])
    ),
    new StopWorkersCommand(
        new FilesystemAdapter('', 10, __DIR__ . '/../var/cache')
    )
];    

Тут требуются некоторые пояснения.

Конструктор класса ConsumeMessagesCommand требует RoutableMessageBus, в который нужно передать сконфигурированный контейнер.

В этом контейнере он сможет найти шину по имени 'message-bus', которое мы указали в определении шины ранее и передали в штампе сообщения.

Так же нужно передать сам контейнер, EventDispatcher и ConsoleLogger с созданным ранее $output для того,

что бы иметь возможность пользоваться всеми преимуществами стандартного консольного вывода от symfony, такими как:

  • расцветка вывода в зависимости от важности события

  • возможность регулировать детализацию вывода стандартными ключами при запуске -v -vv -vvv.

Обратите внимание, что $output с которым мы создавали ConsoleLogger впоследствии необходимо передать вызову $application->run(null, $output);

Для команды stopWorkersCommand нужно передать адаптер кэширования, для возможности мягкой остановки воркеров, чтобы избежать ситуации когда воркер уже взял сообщение из очереди, но ещё не успел его обработать.

Полный код файла console (используется контейнер php-di, но можно любой psr-11 совместимый)
#!/usr/bin/env php
<?php

require __DIR__.'/../vendor/autoload.php';

use DI\ContainerBuilder;
use Symfony\Component\Messenger\Command\ConsumeMessagesCommand;
use Symfony\Component\Messenger\Command\StopWorkersCommand;
use Symfony\Component\EventDispatcher\EventDispatcher;
use Symfony\Component\Cache\Adapter\FilesystemAdapter;
use Symfony\Component\Messenger\RoutableMessageBus;
use Symfony\Component\Console\Logger\ConsoleLogger;
use Symfony\Component\Console\Output\ConsoleOutput;
use Symfony\Component\Console\Application;

$containerBuilder = new ContainerBuilder();
$containerBuilder->addDefinitions(__DIR__ . '/../config/dependencies.php');
$container = $containerBuilder->build();

$output = new ConsoleOutput();

$commands = [
    new ConsumeMessagesCommand(
        new RoutableMessageBus($container),
        $container,
        new EventDispatcher(),
        new ConsoleLogger($output, [])
    ),
    new StopWorkersCommand(
        new FilesystemAdapter('', 10, __DIR__ . '/../var/cache')
    )
];    

$application = new Application('Console');
$application->addCommands($commands);
$application->run(null, $output);

Теперь запустить консольное приложение можно так: php bin/console messenger:consume ampq-async или для redis-транспорта: php bin/console messenger:consume redis-async, и при помощи ключей -v, -vv, или -vvv управлять детализацией вывода сообщений в консоль, также остановить воркеры командой php bin/console messenger:stop-workers.

Заключение

Исследовав компоненты messenger и console, я высоко оценил удобство этих инструментов.

Для меня это был не только отличный повод разобраться в устройстве Symfony, но и статья (в хорошем смысле этого слова), ну ладно не статья, рецепт или заметочка.

Комментарии (8)


  1. anonymous
    00.00.0000 00:00


  1. pilot114
    19.12.2021 20:45
    +4

    Таким образом, получаем полностью сконфигурированную шину сообщений. Очень просто, не правда ли?

    Как раз этот блок конфигураций не сказать что очевидный, особенно тому, кто только начал разбираться.
    Разжевали бы, что именно там происходит — полезность статьи сильно бы повысилась.

    Вопрос по поводу именования шины ('message-bus').
    Имеет ли смысл именовать класс, у которого по определению уже есть имя, причем уникальное?

    Иначе говоря, зачем писать
    $bus = $this->container->get('message-bus');

    если можно
    $bus = $this->container->get(MessageBus::class);

    плюсы
    1) в любой IDE переходим в определение класса по ctrl+ПКМ
    2) опять же, при случае, переименовать класс в IDE гораздо удобнее, чем реплейсить строку 'message-bus'


    1. Russell-s-Teapot Автор
      19.12.2021 21:02

      В приложении может быть более одной шины. Распространенная практика когда используются разные шины для команд, запросов и событий. Кроме того имя шины далее используется в штампе. Без этого RoutableMessageBus не сможет найти Consumer.


      1. pilot114
        19.12.2021 21:50
        +1

        В приложении может быть более одной шины

        Ну да, например CommandBus, RequestBus, EventBus, отдельные классы выглядят логично
        Кроме того имя шины далее используется в штампе

        Так или иначе, это строка, и он по этой строке вытаскивает зависимость из контейнера, так что разницы никакой


        1. Russell-s-Teapot Автор
          19.12.2021 21:53
          +1

          В самом деле, спасибо, принимаю, как ценное замечание.


        1. Torrion
          20.12.2021 14:14

          Если говорить с точки зрения symfony framework(а скорее даже SOLID), то ваш код вобще не обязан знать что используется и как оно работает. Есть MessageBusInterface, который запрашивает ваш класс и это все что он должен знать. Остальное - не его компетенция. Ваш класс запрашивает интерфейс как аргумент своего конструктора, а уже что туда передадут и откуда оно возмется - вопрос конейнера, который будет создавать и конфигурировать ваш класс. И это работает не только с шиной сообщений: логи, кеш - туда же. Да хоть репозитории. Такой подход позволяет гибко управлять потоками данных с одной стороны, и менять реализации интерфейсов в зависимости от окружения с другой. При этом в самом коде не меняется ни строчки, только конфиурации.


  1. oxidmod
    21.12.2021 12:33

    Мне вот интересно, а есть ли действительно смысл таким заниматься? Неужели полноценное symfony-приложение будет намного медленней? Тем более, что нам не нужно будет создавать HttpKernel, мы исключим роутинг и шаблонизацию. По сути останется контейнер, парсер конфигов (они будут закешены на проде), консольный кернел и вроде ивент диспатчер. Зато из коробки все поднимается, полноценная документация фреймворка, готовые рецепты для популярных бандлов


    1. Russell-s-Teapot Автор
      21.12.2021 12:43

      Конечно полноценное symfony-приложение кошернее, но если нужно прикрутить чуть чуть асинхронности к действующему проекту, почему бы не воспользоваться хорошими компонентами.