Часто в проектах возникает необходимость выполнения отложенных задач, таких как отправка email, push и других специфических задач, свойственных доменной области вашего приложения. Сложности начинаются, когда обычного crontab уже недостаточно, когда пакетная обработка не подходит и когда у каждой единицы задачи свое время выполнения или оно назначается динамически.
Для решения такой задачи было создано очередное решение под названием Trigger Hook. Принципиальная схема работы показана на рисунке 1. На схеме показано, что происходит с заданиями в течения всего их жизненного цикла. Смена цвета означает смену статуса задачи.
задача, время запуска которой наступит не скоро | |
задача, время запуска которой скоро наступит | |
задача, время запуска которой наступило | |
обработанное задание | |
неподтвержденный статус задания в базе данных | |
команда на удаление |
Жизненный цикл задачи:
При создании задачи она попадает в базу данных (квадратный блок) (красные и желтые).
В оперативную память подгружаются задачи (треугольный блок), если их время запуска скоро наступит (переход красный->желтый). Данная структура реализована в виде приоритезированной очереди (кучи).
При наступлении времени выполнения задачи, она посылается на выполнение (переход желтый->зеленый). Используется промежуточный буфер перед обработкой для компенсации пиковых нагрузок.
В случае успешной отправки задачи, она удаляется из базы данных (переход зеленый->голубой->удаление). Используется промежуточный буфер перед удалением, также для компенсации пиковых нагрузок.
Дальше постараюсь подробнее описать некоторые особенности и привести аргументы в пользу выбора данного решения.
Простота API
Id принимается в формате UUIDv4. Если не передать, то будет сгенерирован самостоятельно. Возможность передачи id задачи со стороны внешнего сервиса будет полезна при использовании асинхронного канала. Время запуска указывается в формате UNIX.
Создание:
task := &domain.Task{
Id: id,
ExecTime: time,
}
triggerHook.Create(task)
Удаление:
triggerHook.Delete(task.Id)
Получение событий наступления времени запуска:
for {
result := triggerHook.Consume()
if err != send(result.Task()) {
result.Rollback()
}
result.Confirm()
}
Стойкость к сбоям
При обработке задач может произойти сбой, например, если потеряно соединение с брокером сообщений. В таком случае выполнение задачи подтверждено не будет, а будет проведена повторная попытка отправки. Задача отметится как обработанная только при вызове метода подтверждения. Внезапная остановка приложения не приведет к несогласованности в базе данных.
Кроме того, с учетом общей тенденции рынка к переносу приложений в облако в виде микро-сервисов формируются новые требования к приложению. По крайней мере то, что выходило на задний план ранее, сейчас становится более важным. При этом подходе контейнеризированные приложения имеют временную природу. Механизм Trigger Hook делает возможным сворачивание микро-сервиса на одном сервере и разворачивание на другом в производственной среде без мягкой остановки.
В случае аварийного завершения приложения, есть вероятность, что выполнение некоторых задач может быть не подтверждено в базе данных. При повторном запуске приложения эти задачи будут отправлены на выполнение повторно. Такое поведение является компромиссом в пользу обеспечения стойкости к сбоям. Получив сообщение от Trigger Hook, Ваше приложение должно выполнять задачу только один раз, а при повторном получении - игнорировать. Такие ситуации обычное явление в событийно ориентированных архитектурах и они не должны нарушать внутреннее состояние и генерировать большое количество ошибок.
Точность и производительность
Для избежания высокой частоты запросов в базу данных предусмотрен механизм периодической предзагрузки наборов задач, время выполнения которых находится в заданном диапазоне. Другими словами, делаются редкие запросы наборов задач вместо частых запросов задач по одной. Такая схема хорошо подходит, если например, на одно время назначено выполнение нескольких сотен тысяч задач. После загрузки задач они сортируются в порядке приоритета. Когда время таймера для самой приоритетной задачи истекает, то она сразу же поступает на обработку клиентскому коду. Это позволяет добиться большой пиковой производительности и обработки задач с секундной точностью.
Также, большая производительность отправки задач на выполнение достигается за счет простой схемы хранения задач, индексирования и много-поточного доступа к базе данных.
Были измерены основные показатели скорости обработки задач.
Сервер приложения:
AWS EC2 Ubuntu 20
t2.micro
1 vCPUs 2.5 GHz
1 GiB RAM
Сервер базы данных:
AWS RDS MySQL 8.0
db.t3.micro
2 vCPUs
1 GiB RAM
Network: 2085 Mbps
Тест | Длительность теста | Средняя скорость (задач/сек) | Количество задач |
Создание задач | 1 минута 11 сек | 1396 | 100000 |
Удаление задач | 52 сек | 1920 | 100000 |
Отправка задач (состояние задачи от красной до голубой) | 498 милисекунд | 200668 | 100000 |
Подтверждение задач (состояние задачи от голубой до удаления) | 2 сек | 49905 | 100000 |
Мониторинг
Для быстрой проверки корректной работы Trigger Hook предоставляет возможность подключить time-series базу данных. На этапе инициализации есть возможность определить периодичность измерений и выбрать интересующие метрики. Полный список доступных метрик есть тут.
Также есть возможность подключить систему логирования через адаптер. Доступны:
фатальные ошибки - приводящие к полной остановке приложения
ошибки на которые стоит обратить внимание, но которые не приводят к остановке
дебаг сообщения
Далее в примере Вы можете увидеть пример подключения к InfluxDB+Grafana
Trigger Hook в составе микро-сервисной архитектуры
Асинхронное взаимодействие
При использовании микро-сервисной архитектуры, обычно, предпочтение отдают асинхронному взаимодействию. Trigger Hook хорошо вписывается в микро-сервисную, событийно ориентированную архитектуру. Но в любом случае, входящие (создание, удаление) и исходящее (событие наступления времени запуска задачи) каналы могут быть как асинхронными, так и синхронными в зависимости от требований.
Ниже, на рисунке 2 приведен один из возможных вариантов схемы коммуникации через асинхронный канал. В качестве брокера сообщений может выступать какая-нибудь очередь, например, RabbitMQ. Эта схема исключает блокировку вызываемого микро-сервиса вызывающим, как при синхронном запросе посредством, например HTTP. Брокер принимает неограниченное количество задач (условно неограниченное), а обработчик этих задач берется за них по мере освобождения. Как только команда на создание будет обработана, отправляется событие об успешном создании задачи. Так же через брокер, клиентский сервис получает это событие и реагирует на него соответствующим образом - меняет статус сущности, использующей отложенное задание. В качестве сущности может выступать, например Push уведомление на мобильные устройства с рекламой.
Существенным недостатком данной схемы является усложнение инфраструктуры обслуживающей подобное взаимодействие. По сути, введение статусов “ожидания” ответа от других микро-сервисов это есть распределенные транзакции.
На рисунке 3 показаны процессы создания сущности имеющий отложенное выполнение и на рисунке 4 выполнение при наступлении времени.
Совместный доступ
Отсутствие возможности передачи некоторой полезной нагрузки при создании задачи может некоторых разочаровать. Но уверяю, в этом нет необходимости. Trigger Hook содержит достаточный функционал для построения менеджера задач. Относитесь к Trigger Hook как к слою абстракции, находящегося на инфраструктурном уровне. Полная информация о задаче, например, тип, статус, время исполнения, количество попыток выполнения, полезная нагрузка и тп, будут содержаться в слое абстракции выше Trigger Hook.
Верхний слой будет обладать доменным знанием. Другими словами, менеджер задач будет иметь определенный набор типов задач, определенный набор событий, относящихся к тем или иным типам задач. Например, обращение к интерфейсу будет звучать как “создай отложенную задачу на отправку email сообщения” или “создай отложенную задачу на списание платы по подписке на YouTube”, а уже сам менеджер задач будет обращаться к Trigger Hook с запросом “создай отложенную задачу”. Когда придет время запустить задачу, Trigger Hook создаст событие “время выполнения задания наступило”. Это событие перехватит менеджер задач, обработает его, выдав, например, событие “время списания платы по подписке наступило”. На рисунках 5 и 6 показан этот процесс.
Связь между компонентами приложения должна быть очень слабой. Это касается и микро-сервисов в целом. На практике, одной из причин усиления связи, является перенос части ответственности одного сервиса в другой. Поэтому, одной из самых сложных задач, является поиск границы раздела (монолитного, например) приложения на микро-сервисы. Что бы это сделать удачно, нужно учитывать специфику доменной области знаний и текущей реализации приложения. Теперь вопрос - в какой микро-сервис поместить слой “менеджер задач”?
На рисунке 7 показана схема, где менеджер задач является отдельным, микро-сервисом, содержащий доменное знание о типах задач, событиях относящихся к этим задачам. Как видно из схемы, предполагается совместное использование одного микро-сервиса менеджера заданий для разных клиентских микро-сервисов. У каждого микро-сервиса свой канал для получения событий. В RabbitMq такой канал событий легко реализовать в виде схемы direct.
На рисунке 8 показана иная схема, где менеджер задач является частью клиентского микро-сервиса и используется только для своих внутренних нужд. Такая схема подойдет если нет других микро-сервисов использующих отложенные задания или же каждый микро-сервис имеет свой менеджер задач с Trigger Hook микро-сервисом.
Масштабирование
Некоторые приложения сложнее масштабировать, чем другие. Все намного проще, если состояние приложения хранится только во внешнем хранилище с поддержкой конкурентного доступа, например, классическая связка PHP + MySQL. В этом случае несколько экземпляров приложения PHP разворачиваются на разных серверах, а Nginx балансирует нагрузку между ними, при этом, MySQL ресурс остается один на все экземпляры PHP приложений. Если MySQL не справляется, то уже независимо от PHP приложения, могут быть добавлены реплики.
Все несколько сложнее, когда приложение хранит собственное состояние. Его сложнее масштабировать горизонтально. Trigger Hook хранит свое состояние в оперативной памяти. Оно подгружает задачи, время запуска которых скоро наступит. Допустим, Вы создали задачу, время выполнения которой наступит примерно через 5 секунд. Это означает, что Trigger Hook уже погрузил ее для выполнения. Но Вы захотели отменить эту задачу. Для этого нужно вызвать метод API delete. Важно вызвать этот метод у того экземпляра приложения, который взял задачу на обработку. Это первая сложность.
Вторая сложность заключается в том, что каждый экземпляр Trigger Hook должен иметь собственную схему в БД. Это связано с обеспечением согласованности базы данных при сбоях. В общем, с точки зрения производительности нет смысла использовать экземпляры Trigger Hook для одной базы данных, во первых, потому что Trigger Hook работает в много-поточном режиме, а во вторых, при прочих равных БД является узким горлышком.
На рисунке 9 показан пример масштабирования нагрузки. У каждого экземпляра Trigger Hook своя БД, на разных серверах (иначе особого смысла нет). Перед экземплярами Trigger Hook имеется балансировщик нагрузки. Кроме балансировки, он пишет в какую-нибудь hash map базу данных, например, Redis, пару ключ-значение:
task_id:instance_host
Это нужно для обеспечения функции удаления задачи. Если в Вашем приложении не предусмотрено удаление, то достаточно балансера без базы данных. События, генерируемые экземплярами Trigger Hook можно пересылать по одному каналу через брокер. Генерирование id будет происходить на стороне клиентского сервиса (при асинхронном взаимодействии) или на стороне trigger hook (при асинхронном или синхронном взаимодействии). Для клиентских сервисов интерфейс не изменится.
Приложение для демонстрации Trigger Hook
Приложение состоит из пяти микро-сервисов. Каждый использует Docker контейнер. Все работает на Kubernetes. Приложение легко можно развернуть в minikube. Тут описана подробная инструкция.
Message service - сервис (рисунок 11), который предоставляет API для создания email сообщений и назначения отправки на определенное время или отмены. Также позволяет просмотреть полный список сообщений и их статусы.
Некоторые особенности:
Находится на уровне домена.
Состоит из менеджера сообщений и менеджера заданий.
Написан на PHP, фреймворк Symfony 5.
Работает в двух экземплярах. Первый обслуживает API запросы при помощи Nginx. Второй - запускает демон через supervisor для прослушивания события из очереди RabbitMQ. Имеет вспомогательные экземпляры для запуска миграций.
Использует схему с рисунка 8 для управления заданиями.
Message Dashboard - интерфейс для Message service (рисунок 12).
Сервис Mailer находится на уровне инфраструктуры. Должен непосредственно делать рассылку. Не реализован, так как не важен в рамках демо.
Trigger service - сервис уровня инфраструктуры. Использует GRPC канал для получения команд на создание и удаление заданий, AMQP для рассылки события наступления времени выполнения задания (триггер).
Monitoring - также находится на инфраструктурном уровне, так как показывает технические метрики без привязки к бизнес событиям. На рисунке 14 показано как выглядит панель. Используется Grafana и InfluxDB. Полное описание метрик есть тут.
Надеюсь, приложение и статья будут Вам полезны! Следите за моим github, следите за проектом, ставьте звездочки). Спасибо!
theRavel
Рассматривали ли вы уже существующие способы реализовать scheduled delivery? Например:
VladislavPav Автор
RabbitMQ Delayed Message Plugin — имеет сложности с обслуживанием сотен и миллионов сообщений. Нет удаления сообщений. Указывается задержка перед отправкой, что может привести к смещению времени выполнения, например, если возникнет какая-нибудь задержка в цепочке вызовов от клиентского сервиса.
ActiveMQ Delay and Schedule Message Delivery — Указывается задержка перед отправкой, не UTC метка. Похоже, так же нет удаления.
Amazon SQS message timers — как Вы сами сказали, есть ограничения, 15 мин максимум.
Среди указанных тут, Kafka Message Scheduler и Azure Service Bus… имеют очень похожий функционал на Trigger Hook. Создание с меткой UTC, возможность удаления.