По роду своей деятельности мне достаточно часто приходится участвовать в проектах, в которых создаются высокодоступные, высокопроизводительные системы для различных рынков — реклама, финтех, сервисы классов SaaS, PaaS. В таких системах применяется вполне устоявшийся набор архитектур и компонентов, которые позволяют эффективно обеспечить соответствие продукта требованиям, например, lambda-архитектура для поточной обработки данных, масштабируемый микросервисный дизайн программного обеспечения, ориентированный на горизонтальное масштабирование, noSQL СУБД (Redis, Aerospike, Cassandra, MongoDB), брокеры сообщений (Kafka, RabbitMQ), распределенные серверы координации и обнаружения (Apache Zookeeper, Consul). Такие базовые инфраструктурные блоки чаще всего позволяют успешно решить большую часть задач и команда разработки не сталкивается с задачами разработки компонентов среднего уровня (middleware), которые, в свою очередь, будут использованы бизнес-ориентированной частью разрабатываемой системы.


В некоторых случаях возникают такие задачи, которые проектной команде не удается реализовать с помощью существующих решений, являющихся стандартными, тогда появляется потребность в поиске или разработке узкоспециализированных продуктов, предназначенных для решения специфических задач.


Задача


В одном из проектов у команды возникла достаточно специфичная задача, в рамках которой была необходимость организации строго-упорядоченной очереди данных, которая бы позволяла осуществлять "проигрывание" данных, поддерживала репликацию, горизонтальное масштабирование и была отказоустойчивой. Это классическая структура данных, которая называется журналом операций (Commit Log) и применяется практически во всех более-менее сложных СУБД.


Основное требования к журналу операций — это фиксация изменений в строгом порядке, таким образом структура гарантирует, что тот клиент, который инициировал запись первым гарантированно получит меньший порядковый номер записи в журнале, даже если запись заняла большее время чем у некоторого другого клиента. Широкое применение журналы операций имеют в системах, в которых имеется конкурентный доступ к объектам, который должен быть упорядочен. Обычная очередь, поддерживающая конкурентный доступ — это пример такого журнала (например, LinkedBlockingQueue в Java).

Ценность такой структуры в том, что она гарантирует неизменность конечного состояния при повторных проходах и часто используется именно для того, чтобы корректно отразить конечное состояние объекта при множественных изменениях.


Сложности начинаются в том случае, если множество агентов должны распространить упорядоченное изменение состояний на множестве узлов для множества объектов. Очевидный и самый низкопроизводительный подход основывается на использовании сервиса распределенных блокировок, например, Apache Zookeeper, с помощью которого осуществляется упорядоченный доступ к некоторому хранилищу. Вариант является рабочим, если изменения объектов происходят редко, однако, для реализации высокопроизводительной системы, в которой состояния объектов меняются часто, данный вариант не подходит. Частный случай с двумя агентами и двумя узлами изображен на рисунке.



Если внимательно посмотреть на вводное изображение с Риком, Морти и Саммер, то можно как раз это и заметить… Все вроде похоже, но уже есть небольшие рассогласования.

Нам требовалось разработать реплицируемую систему типа Leader/Followers, каждый узел которой бы поддерживал актуальное состояние объектов в RocksDB, а при потере узла его можно было бы легко восстановить из другого узла и существующего журнала операций, абстрактный вид представлен на диаграмме:


Основная инженерная проблема, которая существует в рамках данной задачи — это разработка распределенного реплицируемого журнала операций.


Распределенный реплицируемый журнал операций


Сначала, как обычно, мысль пошла по пути — "нам нужен свой бар... мы же крутые разработчики — давайте сами все сделаем!". У нас была реализация для локального журнала и мы подумали, что, наверное, мы сможем расширить ее на сетевую и реплицируемую реализацию. Честно говоря, объем работы вызывал притупленную зубную боль, а ведь это был не основной продукт, а компонент нижележащего слоя.


Постойте, но есть же Apache Kafka, скажет удивленный читатель! И будет почти прав. Apache Kafka — прекрасная штука, но в рамках данной задачи ей не хватает следующих функций:
  1. Подтверждения завершения операции
  2. Гарантии порядка и уникальности данных


В большинстве случаев Apache Kafka будет работать так, как нужно, но при потере пакетов TCP или падении мастера Вы не имеете никаких гарантий, что ваше сообщение не продублируется. Это связано с тем, что сообщения в Kafka отправляются по принципу "fire-and-forget", а клиент не управляет порядком записей на сервере, что логично, поскольку Apache Kafka оптимизирована на пропускную способность.

Однако, начав анализ и обдумывание деталей решения, я обнаружил, что решение уже есть, просто мы о нем не знали. И это — Apache BookKeeper. Более того, он реализован идеологически и технологически практически так, как бы мы это стали делать сами — Apache Zookeeper, Java, Netty (наш проект на Scala, но стек Java сильно порадовал). Как результат, была инициирована новая фаза, в ходе которой мы протестировали Apache BookKeeper на предмет соответствия нашим потребностям.


Apache BookKeeper


Далее, я постараюсь рассказать о принципах и подходе, который используется в системе Apache BookKeeper для решения задачи реплицируемых, масштабируемых журналов операций.


Итак, продукт не так чтобы сильно известный, однако пользователи у него имеются. К примеру:



Возможно, надо было просто сделать перевод первой статьи, но, чукча в душе — писатель.

Сначала посмотрим на концептуальную архитектуру Apache BookKeeper, которая отображена на следующем рисунке:


Ключевые элементы схемы:


  • Bookie — сервер, на котором хранятся данные журналов;
  • Ledger — "гроссбух", которые организует некоторое количество записей в единый том, в рамках которого определяются серверы Bookie, на которые происходит репликация данных;
  • Ledger Entry — запись в гроссбухе, полезная информация;
  • BK Client — клиентский код на Java, который реализует API;
  • Zookeeper — сервер Apache Zookeeper, который хранит в себе записи о гроссбухах (номер, серверы Bookie, где хранятся реплики).

Теперь рассмотрим ключевые элементы подробнее, чтобы понять как устроена модель взаимодействия с Apache BookKeeper и его архитектура управления данными.


Bookie


Сервер хранения данных. В рамках BookKeeper мы развертываем и запускаем множество серверов Bookie, на поле которых возникает реплицируемая среда хранения данных. Данные распределяются в рамках гроссбухов, способ распределения плоский, система не позволяет задавать какие-либо топологические метки (а жаль). Добавление серверов Bookie позволяет осуществлять горизонтальное масштабирование системы.


Ledger / Гроссбух


Организует набор строго упорядоченных записей, реплицируемых на одни и те же серверы. Гроссбухи упорядочены по возрастанию идентификаторов. Данные о серверах реплик хранятся в записи гроссбуха в Zookeeper.


Важное ограничение 1. В гроссбух может писать только один писатель. Таким образом, гроссбух — это единица масштабирования. Если ряд операций не позволяет разнести их по разным гроссбухам, то лимит производительности — это скорость записи в гроссбух, которая зависит от фактора репликации (хотя не линейно, поскольку BK Client пишет параллельно).


При записи в гроссбух каждой записи выдается уникальный возрастающий порядковый целочисленный номер, выдаваемый клиентом, что позволяет организовывать асинхронный подход к записи. При этом BookKeeper гарантирует, что операция записи с номером (n+1) завершится только тогда, когда все операции записи с младшими номерами завершились. Это очень полезное свойство, которое позволяет существенно улучшить производительность операций записи в некоторых кейсах, даже, если требуется упорядоченное уведомление клиентов о завершении операций.


Важное ограничение 2. Из гроссбуха может читать параллельно неограниченное количество читателей, при этом чтение разрешено тогда, когда гроссбух закрыт, то есть, в настоящий момент никем не пишется.


Вообще, вышеуказанные ограничения для гроссбухов — это существенные ограничения, которые в некоторых случаях могут значительно сузить сферу применения.


К примеру, вы открываете новый гроссбух каждую секунду, пишете в него и закрываете. Это означает, что обработка чтения будет отставать на 1 секунду. Поскольку гроссбух имеет тип идентификатора Long, то можно открывать каждую 1мс без особых проблем, но тогда вы столкнетесь с ситуацией, что возрастет нагрузка на Zookeeper, для которого это будет уже существенной нагрузкой. В решаемой нами задаче, это ограничение было допустимым.


Другой пример, интенсивность записи такова, что производительности серверов в рамках одного гроссбуха недостаточно, а логика приложения такова, что партиционировать запись по нескольким гроссбухам не представляется возможным. Все, как говорится, приехали. Данное ограничение в рамках нашей задачи тоже обходилось, мы смогли партиционировать все объекты таким образом, и у нас остался только один поток данных, для которого мы не смогли обойти данное ограничение.


Хранение идентификаторов гроссбухов


Идентификаторы гроссбухов хранятся в Zookeeper, что позволяет осуществить по ним итерирование, однако, не все так просто. Если мы хотим иметь несколько параллельных цепочек гроссбухов, то необходимо хранить идентификаторы гроссбухов, которые относятся к конкретной цепочке где-то отдельно, BookKeeper нам с этим не поможет. Другой вариант — осуществлять развертывание изолированных серверов Bookie под каждую задачу отдельно, чтобы у них был отдельный путь в Zookeeper (что нам не подходило).


Надо отметить, что сам Apache Zookeeper накладывает дополнительные ограничения, которые тоже необходимо учесть:


  1. Вся база данных должна помещаться в памяти сервера (если открывать много гроссбухов и не удалять их), вполне возможно, что когда-нибудь на сервере не хватит RAM (маловероятно, конечно, но списывать со счетов данный факт не стоит).
  2. Если в родительском каталоге Zookeeper находится много элементов, то при получении листинга каталога может произойти ошибка, если он не поместится в максимальный размер пакета Zookeeper равный 1MB.

Создатели Apache BookKeeper предоставляют решение для проблемы 2, вводя иерархический менеджер гроссбухов (Hierarchical Ledger Manager), который организует плоское пространство идентификаторов в иерархическое дерево путем дробления Long по уровням. По умолчанию используется Flat Ledger Manager, который явно неприменим при частом порождении новых гроссбухов.


Ledger Entry / Запись


Запись представляет собой элемент гроссбуха. Каждая запись в рамках гроссбуха имеет последовательный, возрастающий идентификатор — от 0… Запись содержит произвольные данные в виде байтового массива. Добавление записи может производиться в синхронном и асинхронном режиме. В асинхронном режиме, как обычно, можно добиться более высокой производительности для многопользовательских приложений. Как уже ранее упоминалось, вызов асинхронной части происходит тогда, когда запись добавляется в гроссбух, то есть упорядоченно.


Пожалуй, эти три концепта — Bookie, Ledger (гроссбух) и Ledger Entry (запись) являются важнейшими элементами для понимания работы Apache BookKeeper.


Вместо заключения


Надо сказать, что Apache BookKeeper не выглядит как серебряная пуля или как волшебная таблетка, это вполне специфическое решение, которое никак не противоречит CAP теореме и накладывает существенное количество ограничений на решаемую задачу, которые часто невозможно обойти. К счастью для нас, мы смогли обеспечить с помощью данного компонента горизонтальное масштабирование системы, но для поддержки наших требований пришлось решить попутно несколько инженерных задач, например, как корректно читать данные из двух гроссбухов и как хранить в Zookeeper "просеянные" списки для непересекающихся гроссбухов.


Эта статья носит вводный, ознакомительный характер. Сделать на Apache BookKeeper Hello World достаточно легко, авторы предоставляют подробное стартовое руководство, пока мы разбирались, переписали его реализацию на Scala.


О RocksDB


RocksDB была выбрана в связи с тем, что она обладает высокой производительностью и поддерживает атомарную пакетную запись (все или ничего), что позволяет обеспечить корректное поведение при возникновении нештатных ситуаций. В рамках нашей задачи мы загружали предыдущее состояние из RocksDB, вычитывали все действия из гроссбуха (ов), формировали итоговое состояние в RAM и записывали атомарно в RocksDB, при этом в запись так же шли номера гроссбухов, которые мы обработали.

Поделиться с друзьями
-->

Комментарии (0)