Предисловие


Мой сайт, которым я занимаюсь в качестве хобби, предназначен для хранения интересных домашних страниц и персональных сайтов. Эта тема стала интересовать меня в самом начале моего пути в программировании, в тот момент меня восхищало нахождение больших профессионалов, которые пишут о себе, своих увлечениях и проектах. Привычка открывать их для себя осталась и сейчас: почти на каждом коммерческом и не очень сайте я продолжаю заглядывать в футер в поисках ссылок на авторов.

Реализация идеи


Первая версия была просто html-страницей на моём персональном сайте, где я складывал ссылки с подписями в ul-список. Набрав за какое-то время страниц 20, я начал думать, что это не очень эффективно и решил попробовать автоматизировать процесс. На stackoverflow я замечал, что многие указывают сайты в своих профилях, поэтому я написал парсер на php, который просто шел по профилям, начиная с первого (адреса на SO и по сей день такого вида: `/users/1`), извлекал ссылки из нужного тега и складывал в SQLite.

Это можно назвать второй версией: коллекция из десятка тысяч урлов в SQLite табличке, которая заменила статический список в html. По этому списку я сделал простой поиск. Т.к. были только урлы, то и поиск был просто по ним.

На этом этапе я забросил проект и вернулся к нему, спустя долгое время. На этом этапе опыт моей работы составлял уже больше трёх лет и я чувствовал, что могу сделать что-то посерьёзнее. К тому же было большое желание осваивать относительно новые для себя технологии.

Современная версия


Проект развернут в докере, база переведена на mongoDb, и с относительно недавних пор, добавлен редис, который сначала был просто для кэширования. В качестве основы используется один из микрофреймворков PHP.

Проблема


Новые сайты добавляются консольной командой, которая синхронно делает следующее:

  • Скачивает контент по URL
  • Выставляет флаг о том, доступен ли был HTTPS
  • Сохраняет сущность веб-сайта
  • Исходный HTML и заголовки сохраняет в историю «индексирования»
  • Парсит контент, извлекает Title и Description
  • Данные сохраняет в отдельную коллекцию

Этого было достаточно, чтобы просто хранить сайты и отображать их в списке:



Но идея всё автоматически индексировать, категоризировать и ранжировать, держа всё в актуальном состоянии в эту парадигму укладывалась слабо. Даже простое добавление web-метода для добавления страниц потребовало дублирования кода и блокировок для избежания потенциального DDoS.

Вообще, конечно, всё можно делать и синхронно, а в web-методе производить просто сохранение УРЛа для того, чтобы монструозный демон выполнял все задачи для УРЛов из списка. Но всё равно даже тут напрашивается слово «очередь». А если очередь внедрить, то можно все задачи разделить и выполнять по крайней мере асинхронно.

Решение


Внедрить очереди и сделать event-driven систему обработки всех задач. И как раз давно хотелось попробовать Redis Streams.

Использование Redis streams в PHP


Т.к. фреймворк у меня не из тройки гигантов Symfony, Laravel, Yii, то и библиотеку хотелось бы найти независимую. Но, как оказалось (при первом рассмотрении) — отдельных серьёзных библиотек найти невозможно. Всё, что связано с очередями, либо является проектиком из 3 коммитов пятилетней давности, либо привязано к фреймворку.

Я наслышан о Symfony, как о поставщике отдельных полезных компонентов, к тому же некоторые я уже использую. А также от Laravel кое-что тоже можно использовать, например их ORM, без присутствия самого фреймворка.

symfony/messenger


Первый же кандидат сразу же показался идеальным и безо всяких сомнений я его установил. Но нагуглить примеры использования вне Symfony оказалось сложнее. Как собрать из кучи классов с универсальными, ни о чём не говорящими названиями, шину для передачи сообщений, да еще и на Redis?



Документация на официальном сайте была достаточно подробной, но инициализация была описана только для Symfony с помощью их любимого YML и других магических методов для не симфониста. Интереса в самом процессе установки у меня не было, особенно в новогодние каникулы. Но пришлось заниматься этим и неожиданно долго.

Попытка разобраться с инстанциированием системы по исходникам Symfony задача тоже не самая тривиальная для сжатых сроков:



Поковырявшись в этом всём и попытавшись что-то сделать руками, я пришел к выводу, что занимаюсь какими-то костылями и решил попробовать что-нибудь еще.

illuminate/queue


Оказалось, что эта библиотека намертво привязана к инфраструктуре Laravel и куче других зависимостей, поэтому много времени я на нее не тратил: поставил, посмотрел, увидел зависимости и удалил.

yiisoft/yii2-queue


Ну тут сразу предполагалось из названия, опять же жёсткая привязка к Yii2. Этой библиотекой мне приходилось пользоваться и она была неплохой, но о том, что она полностью зависит от Yii2 я не думал.

Остальные


Всё другое, что я находил на гитхабе — ненадежные устаревшие и заброшенные проектики без звезд, форков и большого количества коммитов.

Возврат к symfony/messenger, технические подробности


Пришлось разобраться с этой библиотекой и, потратив еще какой-то время, я смог. Оказалось, что всё достаточно лаконично и просто. Для инстанциирования шины я сделал небольшую фабрику, т.к. шин у меня предполагалось несколько и с разными обработчиками.



Всего несколько шагов:

  • Создаем обработчики сообщений, которые должны быть просто callable
  • Заворачиваем их в HandlerDescriptor (класс из библиотеки)
  • Эти «Дескрипторы» заворачиваем в инстанс HandlersLocator
  • Добавляем HandlersLocator в инстанс MessageBus
  • Передаем в SendersLocator набор `SenderInterface`, в моём случае инстансы классов `RedisTransport`, которые конфигурируются очевидным образом
  • Добавляем SendersLocator в инстанс MessageBus

MessageBus имеет метод `->dispatch()`, который ищет соответствующие обработчики в HandlersLocator и передает сообщение им, пользуясь соответствующими `SenderInterface` для отправки через шину (Redis streams).

В конфигурации контейнера (в данном случае php-di) вся эта связка может быть законфигурирована так:

        CONTAINER_REDIS_TRANSPORT_SECRET => function (ContainerInterface $c) {
            return new RedisTransport(
                $c->get(CONTAINER_REDIS_STREAM_CONNECTION_SECRET),
                $c->get(CONTAINER_SERIALIZER))
            ;
        },
        CONTAINER_REDIS_TRANSPORT_LOG => function (ContainerInterface $c) {
            return new RedisTransport(
                $c->get(CONTAINER_REDIS_STREAM_CONNECTION_LOG),
                $c->get(CONTAINER_SERIALIZER))
            ;
        },
        CONTAINER_REDIS_STREAM_RECEIVER_SECRET => function (ContainerInterface $c) {
            return new RedisReceiver(
                $c->get(CONTAINER_REDIS_STREAM_CONNECTION_SECRET),
                $c->get(CONTAINER_SERIALIZER)
            );
        },
        CONTAINER_REDIS_STREAM_RECEIVER_LOG => function (ContainerInterface $c) {
            return new RedisReceiver(
                $c->get(CONTAINER_REDIS_STREAM_CONNECTION_LOG),
                $c->get(CONTAINER_SERIALIZER)
            );
        },
        CONTAINER_REDIS_STREAM_BUS => function (ContainerInterface $c) {
            $sendersLocator = new SendersLocator([
                \App\Messages\SecretJsonMessages::class => [CONTAINER_REDIS_TRANSPORT_SECRET],
                \App\Messages\DaemonLogMessage::class => [CONTAINER_REDIS_TRANSPORT_LOG],
            ], $c);
            $middleware[] = new SendMessageMiddleware($sendersLocator);

            return new MessageBus($middleware);
        },
        CONTAINER_REDIS_STREAM_CONNECTION_SECRET => function (ContainerInterface $c) {
            $host = 'bu-02-redis';
            $port = 6379;
            $dsn = "redis://$host:$port";
            $options = [
                'stream' => 'secret',
                'group' => 'default',
                'consumer' => 'default',
            ];

            return Connection::fromDsn($dsn, $options);
        },
        CONTAINER_REDIS_STREAM_CONNECTION_LOG => function (ContainerInterface $c) {
            $host = 'bu-02-redis';
            $port = 6379;
            $dsn = "redis://$host:$port";
            $options = [
                'stream' => 'log',
                'group' => 'default',
                'consumer' => 'default',
            ];

            return Connection::fromDsn($dsn, $options);
        },

Тут видно, что в SendersLocator для двух разных сообщений мы присвоили разный «транспорт», каждый из которых имеет свой коннект на соответствующие стримы.

Я сделал отдельный демо-проект, демонстрирущий приложение из трёх демонов, общающихся между собой с помощью такой шины: https://github.com/backend-university/products/tree/master/products/02-redis-streams-bus.

Но покажу как может быть устроен консьюмер:

use App\Messages\DaemonLogMessage;
use Symfony\Component\Messenger\Handler\HandlerDescriptor;
use Symfony\Component\Messenger\Handler\HandlersLocator;
use Symfony\Component\Messenger\MessageBus;
use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware;
use Symfony\Component\Messenger\Middleware\SendMessageMiddleware;
use Symfony\Component\Messenger\Transport\Sender\SendersLocator;

require_once __DIR__ . '/../vendor/autoload.php';
/** @var \Psr\Container\ContainerInterface $container */
$container = require_once('config/container.php');

$handlers = [
    DaemonLogMessage::class => [
        new HandlerDescriptor(
            function (DaemonLogMessage $m) {
                \error_log('DaemonLogHandler: message handled: / ' . $m->getMessage());
            },
            ['from_transport' => CONTAINER_REDIS_TRANSPORT_LOG]
        )
    ],
];
$middleware = [];
$middleware[] = new HandleMessageMiddleware(new HandlersLocator($handlers));
$sendersLocator = new SendersLocator(['*' => [CONTAINER_REDIS_TRANSPORT_LOG]], $container);
$middleware[] = new SendMessageMiddleware($sendersLocator);

$bus = new MessageBus($middleware);
$receivers = [
    CONTAINER_REDIS_TRANSPORT_LOG => $container->get(CONTAINER_REDIS_STREAM_RECEIVER_LOG),
];
$w = new \Symfony\Component\Messenger\Worker($receivers, $bus, $container->get(CONTAINER_EVENT_DISPATCHER));
$w->run();

Использование этой инфраструктуры в приложении


Реализовав шину в своём бэкенде, я выделил отдельные ступени из старой синхронной команды и сделал отдельные хэндлеры, каждый из которых занимается своим делом.

Пайплайн добавления нового сайта в базу данных получился таким:



И сразу после этого мне стало гораздо проще добавлять новый функционал, например, извлечение и парсинг Rss. Т.к. этот процесс также требует исходный контент, то хэндлер-извлекатель ссылки на rss также как и WebsiteIndexHistoryPersistor подписывается на сообщение «Content/HtmlContent», обрабатывает его и передает нужное сообщение по своему пайплайну дальше.



В конечном итоге получилось несколько демонов, каждый из которых держит подключения только к нужным ресурсам. Например демон crawlers содержит в себе все обработчики, которые требуют похода в интернет за контентом, а демон persister держит подключение к базе данных.

Теперь вместо селектов из базы данных, нужные id после вставки persister’ом просто передаются через шину всем заинтересованным обработчикам.