Lowkiq — это новый сервер упорядоченной обработки фоновых задач для ruby и redis. Он был создан в компании BIA-Technologies, разрабатывающей логистические решения.
В этой статье я расскажу о проблемах обработки фоновых заданий, с которыми мы столкнулись, и о их решении.
Нам нужно индексировать в ElasticSearch документы по перевозке грузов. У нас есть 2 системы: система A асинхронно обрабатывает сообщения от системы B. Информация об одной и той же перевозке содержится в сообщениях различных типов, поэтому данные сначала собираются в БД, а уже потом индексируются. Таким образом, система A ставит в очередь задачу на индексацию заказа. Задача содержит только идентификатор перевозки, обработчик выбирает актуальные данные перевозки из БД, форматирует и отправляет в поисковый движок.
Данные о перевозках обновляются настолько часто, что в очереди содержится много повторяющихся идентификаторов. Повторная обработка дубликатов приводит к увеличенной нагрузке на БД и поисковый движок. Кроме того при параллельной обработке задач новые данные могут перезаписываться старыми (состояние гонки):
- обработчик 1 прочитал первую версию объекта из бд
- объект изменился в бд
- обработчик 2 прочитал вторую версию объекта из бд
- обработчик 2 записал вторую версию объекта в поисковый движок
- обработчик 1 записал первую версию объекта в поисковый движок
Мы использовали Sidekiq для этого случая. Он использует списки в redis для хранения очередей
и запускает параллельно работающие воркеры, выбирающие задачи из очереди, что и вызывает наши проблемы.
Мы решили написать свой демон обработки задач под названием индексер для этого случая. Вместо очереди использовалось неупорядоченное множество в redis, где хранились идентификаторы перевозок, ведь нам не важен порядок и срок их индексации. Таким образом, мы сократили нагрузку, исключив обработку дубликатов. В случае падения задача просто ставилась обратно в очередь, ведь у нас нет порядка или запланированного времени исполнения.
Чтобы исключить состояние гонки, "очередь" была разбита на шарды. Каждому шарду назначался единственный тред-обработчик. Ключом шардирования, очевидно, стала сама задача — идентификатор перевозки.
Разбиение на шарды напоминает партицирование топиков в Kafka. Использование Kafka для хранения задач устранит состояние гонки, но не решит проблему с дубликатами.
Спустя время мы решили расширить этот подход для более сложных случаев. Так появился Lowkiq.
Как говорилось выше, система A обрабатывает сообщения от сторонней системы и сохраняет их в собственную базу данных. При плотном потоке сообщений возникало состояние гонки при обработке других типов сообщений и дэдлоки в бд. Исторически они не очень успешно решались собственным механизмом блокировок в redis.
Теперь задача кроме идентификатора содержит данные о перевозке груза. Нам стали важны порядок обработки задач и предсказуемость обработки ошибок, поэтому обычное множество не подходит для хранения задач.
Спустя несколько итераций появилась схема хранения задач в redis. Каждая задача при постановке в очередь содержит атрибуты:
- идентификатор
- полезную нагрузку
- score полезной нагрузки
- perform_in
Задача в очереди имеет немного другие атрибуты:
- идентификатор
- сортированное множество полезных нагрузок, упорядоченное по их score
- perform_in
- retry_count
score
и perform_in
— числа и по умолчанию равны текущему unix timestamp.
Нагладно посмотреть процессы, протекающие в очереди, можно в презентации.
Задачи с одинаковыми идентификаторами объединяются в одну задачу в очереди, при этом payloads объединяются в сортированное множество и попадают в обработку как одно целое. Это удобно, когда нужно обработать все версии объекта или взять только последнюю.
Идентификаторы, подобно индексеру, хранятся в множестве, правда уже в сортированном по perform_in
. Если не задавать perform_in
, то Lowkiq будет похож на индексер, правда с увеличенными накладными расходами на хранение задач из-за более сложной структуры.
Lowkiq унаследовал от индексера разбиение на шарды, но получил более гибкую систему распределения шардов между тредами. Теперь каждый тред может обрабатывать несколько шардов, что позволяет добавлять новые очереди без увеличения количества тредов.
Lowkiq так же как и индексер гарантирует, что никакой другой обработчик не сможет параллельно обрабатывать задачу с тем же идентификатором.
Несколько наших проектов используют Lowkiq в production в течение года. При этом мы по-прежнему используем Sidekiq и прочие системы очередей в обычных случаях. Если вы сталкиваетесь с теми же проблемами что и мы, попробуйте Lowkiq.