Предисловие
Мой сайт, которым я занимаюсь в качестве хобби, предназначен для хранения интересных домашних страниц и персональных сайтов. Эта тема стала интересовать меня в самом начале моего пути в программировании, в тот момент меня восхищало нахождение больших профессионалов, которые пишут о себе, своих увлечениях и проектах. Привычка открывать их для себя осталась и сейчас: почти на каждом коммерческом и не очень сайте я продолжаю заглядывать в футер в поисках ссылок на авторов.
Реализация идеи
Первая версия была просто html-страницей на моём персональном сайте, где я складывал ссылки с подписями в ul-список. Набрав за какое-то время страниц 20, я начал думать, что это не очень эффективно и решил попробовать автоматизировать процесс. На stackoverflow я замечал, что многие указывают сайты в своих профилях, поэтому я написал парсер на php, который просто шел по профилям, начиная с первого (адреса на SO и по сей день такого вида: `/users/1`), извлекал ссылки из нужного тега и складывал в SQLite.
Это можно назвать второй версией: коллекция из десятка тысяч урлов в SQLite табличке, которая заменила статический список в html. По этому списку я сделал простой поиск. Т.к. были только урлы, то и поиск был просто по ним.
На этом этапе я забросил проект и вернулся к нему, спустя долгое время. На этом этапе опыт моей работы составлял уже больше трёх лет и я чувствовал, что могу сделать что-то посерьёзнее. К тому же было большое желание осваивать относительно новые для себя технологии.
Современная версия
Проект развернут в докере, база переведена на mongoDb, и с относительно недавних пор, добавлен редис, который сначала был просто для кэширования. В качестве основы используется один из микрофреймворков PHP.
Проблема
Новые сайты добавляются консольной командой, которая синхронно делает следующее:
- Скачивает контент по URL
- Выставляет флаг о том, доступен ли был HTTPS
- Сохраняет сущность веб-сайта
- Исходный HTML и заголовки сохраняет в историю «индексирования»
- Парсит контент, извлекает Title и Description
- Данные сохраняет в отдельную коллекцию
Этого было достаточно, чтобы просто хранить сайты и отображать их в списке:
Но идея всё автоматически индексировать, категоризировать и ранжировать, держа всё в актуальном состоянии в эту парадигму укладывалась слабо. Даже простое добавление web-метода для добавления страниц потребовало дублирования кода и блокировок для избежания потенциального DDoS.
Вообще, конечно, всё можно делать и синхронно, а в web-методе производить просто сохранение УРЛа для того, чтобы монструозный демон выполнял все задачи для УРЛов из списка. Но всё равно даже тут напрашивается слово «очередь». А если очередь внедрить, то можно все задачи разделить и выполнять по крайней мере асинхронно.
Решение
Внедрить очереди и сделать event-driven систему обработки всех задач. И как раз давно хотелось попробовать Redis Streams.
Использование Redis streams в PHP
Т.к. фреймворк у меня не из тройки гигантов Symfony, Laravel, Yii, то и библиотеку хотелось бы найти независимую. Но, как оказалось (при первом рассмотрении) — отдельных серьёзных библиотек найти невозможно. Всё, что связано с очередями, либо является проектиком из 3 коммитов пятилетней давности, либо привязано к фреймворку.
Я наслышан о Symfony, как о поставщике отдельных полезных компонентов, к тому же некоторые я уже использую. А также от Laravel кое-что тоже можно использовать, например их ORM, без присутствия самого фреймворка.
symfony/messenger
Первый же кандидат сразу же показался идеальным и безо всяких сомнений я его установил. Но нагуглить примеры использования вне Symfony оказалось сложнее. Как собрать из кучи классов с универсальными, ни о чём не говорящими названиями, шину для передачи сообщений, да еще и на Redis?
Документация на официальном сайте была достаточно подробной, но инициализация была описана только для Symfony с помощью их любимого YML и других магических методов для не симфониста. Интереса в самом процессе установки у меня не было, особенно в новогодние каникулы. Но пришлось заниматься этим и неожиданно долго.
Попытка разобраться с инстанциированием системы по исходникам Symfony задача тоже не самая тривиальная для сжатых сроков:
Поковырявшись в этом всём и попытавшись что-то сделать руками, я пришел к выводу, что занимаюсь какими-то костылями и решил попробовать что-нибудь еще.
illuminate/queue
Оказалось, что эта библиотека намертво привязана к инфраструктуре Laravel и куче других зависимостей, поэтому много времени я на нее не тратил: поставил, посмотрел, увидел зависимости и удалил.
yiisoft/yii2-queue
Ну тут сразу предполагалось из названия, опять же жёсткая привязка к Yii2. Этой библиотекой мне приходилось пользоваться и она была неплохой, но о том, что она полностью зависит от Yii2 я не думал.
Остальные
Всё другое, что я находил на гитхабе — ненадежные устаревшие и заброшенные проектики без звезд, форков и большого количества коммитов.
Возврат к symfony/messenger, технические подробности
Пришлось разобраться с этой библиотекой и, потратив еще какой-то время, я смог. Оказалось, что всё достаточно лаконично и просто. Для инстанциирования шины я сделал небольшую фабрику, т.к. шин у меня предполагалось несколько и с разными обработчиками.
Всего несколько шагов:
- Создаем обработчики сообщений, которые должны быть просто callable
- Заворачиваем их в HandlerDescriptor (класс из библиотеки)
- Эти «Дескрипторы» заворачиваем в инстанс HandlersLocator
- Добавляем HandlersLocator в инстанс MessageBus
- Передаем в SendersLocator набор `SenderInterface`, в моём случае инстансы классов `RedisTransport`, которые конфигурируются очевидным образом
- Добавляем SendersLocator в инстанс MessageBus
MessageBus имеет метод `->dispatch()`, который ищет соответствующие обработчики в HandlersLocator и передает сообщение им, пользуясь соответствующими `SenderInterface` для отправки через шину (Redis streams).
В конфигурации контейнера (в данном случае php-di) вся эта связка может быть законфигурирована так:
CONTAINER_REDIS_TRANSPORT_SECRET => function (ContainerInterface $c) {
return new RedisTransport(
$c->get(CONTAINER_REDIS_STREAM_CONNECTION_SECRET),
$c->get(CONTAINER_SERIALIZER))
;
},
CONTAINER_REDIS_TRANSPORT_LOG => function (ContainerInterface $c) {
return new RedisTransport(
$c->get(CONTAINER_REDIS_STREAM_CONNECTION_LOG),
$c->get(CONTAINER_SERIALIZER))
;
},
CONTAINER_REDIS_STREAM_RECEIVER_SECRET => function (ContainerInterface $c) {
return new RedisReceiver(
$c->get(CONTAINER_REDIS_STREAM_CONNECTION_SECRET),
$c->get(CONTAINER_SERIALIZER)
);
},
CONTAINER_REDIS_STREAM_RECEIVER_LOG => function (ContainerInterface $c) {
return new RedisReceiver(
$c->get(CONTAINER_REDIS_STREAM_CONNECTION_LOG),
$c->get(CONTAINER_SERIALIZER)
);
},
CONTAINER_REDIS_STREAM_BUS => function (ContainerInterface $c) {
$sendersLocator = new SendersLocator([
\App\Messages\SecretJsonMessages::class => [CONTAINER_REDIS_TRANSPORT_SECRET],
\App\Messages\DaemonLogMessage::class => [CONTAINER_REDIS_TRANSPORT_LOG],
], $c);
$middleware[] = new SendMessageMiddleware($sendersLocator);
return new MessageBus($middleware);
},
CONTAINER_REDIS_STREAM_CONNECTION_SECRET => function (ContainerInterface $c) {
$host = 'bu-02-redis';
$port = 6379;
$dsn = "redis://$host:$port";
$options = [
'stream' => 'secret',
'group' => 'default',
'consumer' => 'default',
];
return Connection::fromDsn($dsn, $options);
},
CONTAINER_REDIS_STREAM_CONNECTION_LOG => function (ContainerInterface $c) {
$host = 'bu-02-redis';
$port = 6379;
$dsn = "redis://$host:$port";
$options = [
'stream' => 'log',
'group' => 'default',
'consumer' => 'default',
];
return Connection::fromDsn($dsn, $options);
},
Тут видно, что в SendersLocator для двух разных сообщений мы присвоили разный «транспорт», каждый из которых имеет свой коннект на соответствующие стримы.
Я сделал отдельный демо-проект, демонстрирущий приложение из трёх демонов, общающихся между собой с помощью такой шины: https://github.com/backend-university/products/tree/master/products/02-redis-streams-bus.
Но покажу как может быть устроен консьюмер:
use App\Messages\DaemonLogMessage;
use Symfony\Component\Messenger\Handler\HandlerDescriptor;
use Symfony\Component\Messenger\Handler\HandlersLocator;
use Symfony\Component\Messenger\MessageBus;
use Symfony\Component\Messenger\Middleware\HandleMessageMiddleware;
use Symfony\Component\Messenger\Middleware\SendMessageMiddleware;
use Symfony\Component\Messenger\Transport\Sender\SendersLocator;
require_once __DIR__ . '/../vendor/autoload.php';
/** @var \Psr\Container\ContainerInterface $container */
$container = require_once('config/container.php');
$handlers = [
DaemonLogMessage::class => [
new HandlerDescriptor(
function (DaemonLogMessage $m) {
\error_log('DaemonLogHandler: message handled: / ' . $m->getMessage());
},
['from_transport' => CONTAINER_REDIS_TRANSPORT_LOG]
)
],
];
$middleware = [];
$middleware[] = new HandleMessageMiddleware(new HandlersLocator($handlers));
$sendersLocator = new SendersLocator(['*' => [CONTAINER_REDIS_TRANSPORT_LOG]], $container);
$middleware[] = new SendMessageMiddleware($sendersLocator);
$bus = new MessageBus($middleware);
$receivers = [
CONTAINER_REDIS_TRANSPORT_LOG => $container->get(CONTAINER_REDIS_STREAM_RECEIVER_LOG),
];
$w = new \Symfony\Component\Messenger\Worker($receivers, $bus, $container->get(CONTAINER_EVENT_DISPATCHER));
$w->run();
Использование этой инфраструктуры в приложении
Реализовав шину в своём бэкенде, я выделил отдельные ступени из старой синхронной команды и сделал отдельные хэндлеры, каждый из которых занимается своим делом.
Пайплайн добавления нового сайта в базу данных получился таким:
И сразу после этого мне стало гораздо проще добавлять новый функционал, например, извлечение и парсинг Rss. Т.к. этот процесс также требует исходный контент, то хэндлер-извлекатель ссылки на rss также как и WebsiteIndexHistoryPersistor подписывается на сообщение «Content/HtmlContent», обрабатывает его и передает нужное сообщение по своему пайплайну дальше.
В конечном итоге получилось несколько демонов, каждый из которых держит подключения только к нужным ресурсам. Например демон crawlers содержит в себе все обработчики, которые требуют похода в интернет за контентом, а демон persister держит подключение к базе данных.
Теперь вместо селектов из базы данных, нужные id после вставки persister’ом просто передаются через шину всем заинтересованным обработчикам.
OleksiyT
Если задача уйти от фреймворка, то для простого многопоточного парсера стандартных библиотек php разве недостаточно?
Использование лишних и слабо документированных вещей — так себе решение. Уж лучше фреймворк тогда, я считаю )
borisd_ru Автор
Ну нет, от фреймворка уйти нет задачи, а иногда есть задача искать компоненты, не привязанные к определенным фреймворкам. В моём проекте в качестве «фреймворка» используется, грубо говоря, контейнер и request-router, а остальное — компоненты.