Иногда требуется сделать так, чтобы сообщения в Symfony Messenger отправлялись потребителю пакетами, а не поодиночке. Недавно нам потребовалось отправлять через Messenger обновленные строки текста из наших программ поставщику переводческих услуг.

Но из-за жесткого ограничения на интенсивность передачи данных со стороны переводческой фирмы мы не можем отправлять сообщения по одному. Следовательно, необходимо реализовать следующий алгоритм отправки: сперва сохранять все полученные сообщения, предназначенные данному потребителю, а затем отправлять все сообщения, если время ожидания новых сообщений превысило десять секунд или если сохранено более 100 сообщений.

Покажем, как мы это сделали:

// Symfony Messenger Message:
class TranslationUpdate
{
    public function __construct(
        public string $locale,
        public string $key,
        public string $value,
    ) {
    }
}
class TranslationUpdateHandler implements MessageHandlerInterface
{
    private const BUFFER_TIMER = 10; // in seconds
    private const BUFFER_LIMIT = 100;
    private array $buffer = [];

    public function __construct(
        private MessageBusInterface $messageBus,
    ) {
        pcntl_async_signals(true);
        pcntl_signal(SIGALRM, \Closure::fromCallable([$this, 'batchBuffer']));
    }

    public function __invoke(TranslationUpdate $message): void
    {
        $this->buffer[] = $message;

        if (\count($this->buffer) >= self::BUFFER_LIMIT) {
            $this->batchBuffer();
        } else {
            pcntl_alarm(self::BUFFER_TIMER);
        }
    }

    private function batchBuffer(): void
    {
        if (0 === \count($this->buffer)) {
            return;
        }

        $translationBatch = new TranslationBatch($this->buffer);
        $this->messageBus->dispatch($translationBatch);
        $this->buffer = [];

    }
}

Здесь мы имеем дело с сообщением Messenger, которое отправляется каждый раз, когда у нас появляется обновленный текст на перевод (тот же принцип можно применить и к любым другим сообщениям).

Наш обработчик сообщений будет получать все сообщения и помещать их в буфер, представляющий собой массив. Если количество элементов в буфере достигает 100 или если не появляется новых элементов в течение десяти секунд, срабатывает метод batchBuffer.

Для реализации десятисекундного таймера мы используем функцию pcntl_alarm, которая позволяет асинхронно вызывать метод batchBuffer по мере необходимости.

Для обработки системных сигналов в нашем PHP-коде мы используем функции PCNTL (прочитать о них подробнее можно в документации PHP, а также в нашем блоге, если владеете французским). Мы установили таймер, который будет посылать процессу сигнал SIGALRM через заданное количество секунд. Затем, когда сигнал будет принят процессом, запустится функция обратного вызова, которую мы указали в качестве второго аргумента pcntl_signal. Обратный вызов установлен для всего приложения, поэтому мы можем использовать этот трюк с объединением сообщений в пакеты только один раз.

Затем в методе batchBuffer мы используем новую передачу в Messenger (см. вызов dispatch), чтобы отслеживать сообщения на случай возникновения проблем, а поскольку метод реализован через PCNTL, компонент Messenger не будет повторять попытку обработки при исключении.

class TranslationBatch
{
    /**
     * @param TranslationUpdate[] $notifications
     */
    public function __construct(
        private array $notifications,
    ) {
    }
}
class TranslationBatchHandler implements MessageHandlerInterface
{
    public function __invoke(TranslationBatch $message): void
    {
      // handle all our messages
    }
}

Итак, теперь у нас есть обработчик пакетов, который всегда будет получать список сообщений для отправки. С его помощью мы можем легко объединять наши сообщения Messenger в пакеты, не прибегая к использованию cron.

Дополнение. Этот подход — всего лишь доказательство концепции. Если вы хотите применить его в рабочей среде, рекомендую использовать более устойчивое хранилище для реализации буфера, такое как Redis.


Перевод материала подготовлен в рамках курса "Symfony Framework". Всех желающих приглашаем на двухдневный онлайн-интенсив «Создание системы статистики для онлайн-магазина». На интенсиве мы:
— начнем знакомство с Symfony и ClickHouse (если точнее, то построим систему сбора статистики в ClickHouse). На базе подобной системы в будущем вы сможете строить и развивать решения Business Intelligence-систем и операционной статистики,
— затем развернем API,
— и с его помощью посмотрим на инструменты самой статистики.
Регистрация на первый день здесь.