По долгу работы мне приходится сталкиваться с проектированием и разработкой распределенных приложений. Такие приложения часто используют различные средства межпроцессного взаимодействия для организации взаимодействия компонентов. Особые сложности возникают в процессе реализации алгоритмов, обрабатывающих связанные данные распределенно. Для поддержки таких задач используются специализированные системы распределенной координации. Самым популярным и широко используемым продуктом является Apache Zookeeper.
Zookeeper — продукт сложный. Несмотря на солидный возраст, периодически в нем обнаруживаются те или иные ошибки. Однако, это лишь следствие его возможностей, которые помогают сделать жизнь легче многим разработчикам распределенных систем. Далее, я рассмотрю некоторые особенности Zookeeper, которые помогут понять лучше его возможности, а затем перейдем к библиотеке Apache Curator (Netflix), которая делает жизнь разработчиков распределенного ПО приятной и предлагает множество готовых рецептов для реализации распределенных объектов координации.
Apache Zookeeper
Как уже ранее было отмечено, Zookeeper — жизненно важный компонент распределенных систем. Базу данных Zookeeper проще всего представить в виде дерева, похожего на файловую систему, при этом каждый элемент дерева идентифицируется путем (/a/path/to/node) и хранит в себе произвольные данные. Таким образом, с помощью Zookeper вполне можно организовать иерархическое распределенное хранилище данных, а также другие интересные конструкции. Полезность и широкая распространенность Zookeeper-а обеспечивается рядом важнейших свойств, которые перечислены далее.
Распределенный консенсус
Консенсус обеспечивается с помощью алгоритма ZAB, данный алгоритм обеспечивает свойства C(consistency) и P(partition tolerance) CAP-теоремы, что означает целостность и устойчивость к разделению, жертвуя доступностью. На практике это приводит к следующим эффектам:
- Все клиенты видят одно и то же состояние, неважно на каком сервере они запрашивают это состояние.
- Изменение состояния происходит упорядоченно, "гонка" невозможна (для операций set, операции get-set не атомарные).
- Кластер Zookeepr может "развалиться" и стать полностью недоступным, но при этом он станет недоступным для всех.
Консенсус — способность распределенной системы каким-то образом прийти к соглашению о ее текущем состоянии. Zookeeper использует алгоритм ZAB, часто применяются и другие алгоритмы — Raft,
Raft.
Эфемерные узлы
Клиент, устанавливая соединение с кластером Zookeeper, создает сессию. В рамках сессии существует возможность создавать узлы, которые будут видны другим клиентам, но, время существования которых равно времени жизни сессии. При завершении сессии данные узлы будут удалены. Такие узлы имеют ограничения — они могут быть только терминальными и не могут иметь потомков, то есть, нельзя иметь эфемерные поддеревья. Эфемерные узлы часто применяются с целью
реализации систем обнаружения сервисов.
Представим, что у нас есть несколько экземпляров сервиса, между которыми производится балансировка нагрузки. Если какой-то из экземпляров появляется, то для него создается эфемерный узел, в котором находится адрес сервиса, а при аварии сервиса этот узел удаляется и более не может использоваться для балансировки. Эфемерные узлы применяются очень часто.
Подписка на события узла
Клиент может подписаться (watch) на события узлов и получать обновления при возникновении каких-либо событий, связанных с данными узлами. Однако, тут тоже есть ограничение — после возникновения события на узле, подписка снимается и ее необходимо восстанавливать заново, при этом, очевидно, существует возможность пропуска других событий, которые возникают на данном узле. В связи с данным фактом, возможность использования данной функции достаточно ограничена.
Например, в рамках сервисов обнаружения ее применять можно, для реакции на изменение конфигурации, но необходимо помнить, что после установки подписки необходимо выполнить операцию "вручную", чтобы убедиться, что пропуска изменения состояния не произошло.
Последовательные узлы
Zookeeper позволяет создавать узлы, имена которых формируются с добавлением последовательно возрастающих чисел, при этом данные узлы могут быть эфемерными. Эта возможность широко применяется как для решения прикладных задач (например, все однотипные сервисы, регистрируют себя как эфемерные узлы), так и для реализации "рецептов" Zookeeper, к примеру, справедливой распределенной блокировки.
Версии узлов
Версии узлов позволяют определить было ли изменение узла между чтением и записью, то есть при операции set можно указать ожидаемую версию узла, в том случае, если она не совпадет, значит, что изменение узла было произведено другим клиентом и требуется заново вычитать состояние. Данный механизм позволяет реализовать упорядоченное изменение состояния данных, например, при реализации "рецепта" распределенный счетчик.
ACL на узлы
Существует возможность задавать для узлов ограничения доступа, определяемые ACL, что предназначено для защиты данных от недоверенных приложений. Стоит отметить, что, конечно, ACL не защищают от перегрузок, которые может создать вредоносный клиент, предоставляя только механизм ограничения доступа к содержимому.
TTL на узлы
Zookeeper позволяет устанавливать узлам TTL, по истечении которого (если нет обновлений) узел будет удален. Данная функциональность появилась сравнительно недавно.
Серверы-наблюдатели
Существует возможность подключения к кластеру серверов в режиме наблюдатель (observer), которые могут использоваться для выполнения операций чтения, что очень полезно в тех случаях, когда нагрузка на кластер, генерируемая операциями записи является высокой. С использованием серверов-наблюдателей проблема может быть решена. Может возникнуть вопрос, почему бы просто в кластер не добавлять обычные узлы? Ответ кроется в алгоритме консенсуса — чем больше узлов, позволяющих писать данные, тем дольше будет тратиться времени на достижение консенсуса и тем меньше будет производительность кластера на запись. Серверы-наблюдатели не участвуют в консенсусе, а поэтому не влияют на производительность операций записи.
Синхронизация времени на узлах
Zookeeper не использует внешнее время для синхронизации узлов. Это достаточно полезное свойство, системы, которые ориентируются на точное время более подвержены ошибкам, связанным с его рассогласованием.
Конечно, в бочке меда должен быть деготь и он действительно есть — Zookeeper имеет свойства, которые могут ограничивать его применение. Есть даже выражение, которое достаточно иронично описывает сложности работы с Zookeeper — Single Cluster of Failure © Pinterest, что саркастически демонстрирует тот факт, что, стремясь избавиться от единой точки отказа с помощью распределенной системы, используя Zookeeper, можно столкнуться с ситуацией, когда он станет той самой точкой отказа.
База данных Zookeeper должна помещаться в RAM
Zookeeper загружает базу в память и держит ее там. Если база данных не помещается в RAM, то она будет помещена в Swap, что приведет к существенной деградации производительности. Если БД большая, требуется сервер с достаточно большим объемом RAM (что, впрочем, не является проблемой в настоящее время, когда 1TB RAM на сервере — далеко не предел).
Время таймаута сессии
Если при настройке клиента выбрать неверно время таймаута сессии, то это может вести к непредсказуемым последствиям, которые будут обостряться при увеличении нагрузки на кластер и выходе из строя части узлов кластера. Пользователи стремятся уменьшить время сессии (по умолчанию 30 секунд), чтобы увеличить сходимость системы, поскольку эфемерные узлы будут удаляться быстрее, но это ведет к меньшей стабильности системы под нагрузкой.
Деградация производительности от количества узлов в кластере
Обычно, в кластере используют 3 узла, которые участвуют в достижении консенсуса, желание добавить дополнительные узлы существенно снизит производительность операций записи. Количество узлов должно быть нечетным (требование алгоритма ZAB), соответственно, расширение кластера до 5, 7, 9 узлов будет негативно влиять на производительность. Если проблема именно в операциях чтения — используйте узлы-наблюдатели.
Максимальный размер данных в узле
Максимальный размер данных в узле ограничен 1MB. В случае, если требуется хранить большие объемы данных, Zookeeper не подойдет.
Максимальное количество узлов в листинге потомков
Zookepeer не накладывает на то, сколько у узла может быть потомков, однако, максимальный размер пакета данных, который сервер может отправить клиенту составляет 4МБ (jute.maxbuffer). Если у узла такое количество потомков, что их перечень не помещается в один пакет, то, к сожалению, не существует способа получить сведения о них. Данное ограничение обходится с помощью организации иерархических "псевдоплоских" списков таким же образом, каким строятся кэши в файловой системе, имена или дайджесты объектов разбиваются на части и организуются в иерархическую структуру.
Несмотря на недостатки, достоинства их перевешивают, что делает Zookeeper важнейшим компонентом многих распределенных экосистем, например, Cloudera CDH5, или DC/OS, Apache Kafka и других.
Zookeeper для разработчика
Поскольку Zookeeper реализован с использованием языка Java, то в средах JVM его использование является органичным, к примеру, достаточно легко запустить сервер или даже кластер серверов из Java и использовать его для реализации интеграционных или smoke-тестов приложения без необходимости развертывания стороннего сервера. Однако, API клиента Zookeeper достаточно низкоуровневый, что, хотя и позволяет выполнять операции, но напоминает заплыв против течения реки. Кроме того, требуется глубокое понимание основ Zookeeper, чтобы правильно реализовать обработку исключительных ситуаций. К примеру, когда я использовал для работы с Zookeeper базовый интерфейс, отладка и поиск ошибок в коде распределенной координации и обнаружения доставляли достаточно большие проблемы и требовали существенное время.
Однако, решение существует и оно было подарено сообществу разработчиком Netflix Джорданом Циммерманом. Знакомьтесь, Apache Curator.
Apache Curator
На главной странице проекта расположена цитата:
Это утверждение на 100% отражает суть Curator. Начав использовать данную библиотеку, я обнаружил, что код работы с Zookeeper стал простым и понятным, а количество ошибок и время на их устранение снизилось кратно. Если, как ранее было сказано — стандартный клиент напоминает заплыв против течения, то с куратором ситуация меняется на 180 градусов. Кроме того, в рамках Curator-а реализовано большое количество готовых рецептов, которые я обзорно рассмотрю далее.
Базовый API
API выполнен в форме исключительно удобного текучего интерфейса, что позволяет просто и лаконично определять требуемые действия. К примеру (далее, примеры приводятся на языке Scala):
client
.create()
.orSetData()
.forPath("/object/path", byteArray)
что может быть переведено как "создай узел или, если существует, просто установи данные для пути "/object/path" и запиши в него byteArray".
Или, к примеру:
client
.create()
.withMode(CreateMode.EPHEMERAL_SEQUENTIAL)
.forPath("/head/child", byteArray)
"создай узел типа последовательный и эфемерный для пути "/head/child000000XXXX" и запиши в него byteArray". Еще несколько примеров могут быть найдены на этой странице руководства.
Асинхронные операции
Curator поддерживает как синхронный, так и асинхронный режим выполнения операций. В случае асинхронного использования клиент имеет тип AsyncCuratorFramework
, в отличие от синхронного CuratorFramework
. А каждая цепочка вызовов принимает метод thenAccept
, в котором указывается Callback, который вызывается при завершении операции. Более подробно об асинхронном интерфейсе можно узнать на посвященной ему странице руководства.
val async = AsyncCuratorFramework.wrap(client);
async.checkExists().forPath(somePath).thenAccept(stat -> mySuccessOperation(stat))
При использовании Scala использование асинхронного интерфейса не кажется оправданным, поскольку функциональность может быть легко реализована с использованием Scala Future, что позволяет коду сохранить особенности scala-way разработки. Однако, в случае Java и других JVM языков, данный интерфейс может быть полезным.
Поддержка схем данных
Zookeeper не поддерживает семантику хранимых данных. Это означает, что разработчики самостоятельно несут ответственность за то, в каких форматах хранятся данные и по каким путям они расположены. Это может стать неудобным во многих случаях, например, когда в проект приходят новые разработчики. Для решения данных проблем Curator поддерживает схемы данных, которые позволяют задавать ограничения на пути и типы узлов, в рамках данных путей. Схема, создаваемая из конфигурации, может быть представлена в формате Json:
[
{
"name": "test",
"path": "/a/b/c",
"ephemeral": "must",
"sequential": "cannot",
"metadata": {
"origin": "outside",
"type": "large"
}
}
]
Поддержка миграций
Миграции Curator чем-то напоминают Liquibase, только для Zookeeper. С их помощью возможно отражать эволюцию базы данных в новых версиях продукта. Миграция состоит из набора последовательно выполняемых операций. Каждая операция представлена некоторыми преобразованиями над БД Zookeeper. Curator самостоятельно отслеживает примененность миграций с помощью Zookeeper. Данная функция может быть использована в процессе развертывания новой версии приложения. Подробно миграции описаны на соответствующей странице руководства.
Тестовый сервер и тестовый кластер
Для упрощения тестирования, Curator позволяет встроить сервер или даже кластер серверов Zookeeper в приложение. Данную задачу можно достаточно просто решить и без использования Curator, только с Zookeeper, но Curator предоставляет более лаконичный интерфейс. К примеру, в случае Zookeeper без Curator:
class ZookeeperTestServer(zookeperPort: Int, tmp: String) {
val properties = new Properties()
properties.setProperty("tickTime", "2000")
properties.setProperty("initLimit", "10")
properties.setProperty("syncLimit", "5")
properties.setProperty("dataDir", s"$tmp")
properties.setProperty("clientPort", s"$zookeperPort")
val zooKeeperServer = new ZooKeeperServerMain
val quorumConfiguration = new QuorumPeerConfig()
quorumConfiguration.parseProperties(properties)
val configuration = new ServerConfig()
configuration.readFrom(quorumConfiguration)
private val thread = new Thread() {
override def run() = {
zooKeeperServer.runFromConfig(configuration)
}
}
def start = {
thread.start()
}
def stop = {
thread.interrupt()
}
}
...
val s = new ZookeeperTestServer(port, tmp)
s.start
...
s.stop
В случае Curator:
val s = new TestingServer(port)
s.start()
...
s.stop()
Рецепты Curator
Рецепты Curator — основной мотив использования данной библиотеки для реализации распределенных механизмов взаимодействия процессов. Далее, перечислим основные рецепты, которые поддерживаются Curator и как они могут применяться. Некоторые рецепты я не применял на практике, поэтому для них дан максимально приближенный к руководству перевод.
Выбор лидера
Данные рецепты предназначены для реализации отказоустойчивой модели выполнения процессов, в рамках которой существует текущий лидер и несколько процессов находится в горячем резерве. Как только лидер перестает выполнять свои функции, другой процесс становится лидером. Существует два подходящих рецепта:
- Leader Latch, который представляет собой аналог CountDownLatch, который заблокирован до тех пор, пока процесс не стал лидером;
- Leader Election, которые реализует выбор лидера через вызов метода. В момент, когда процесс становится лидером, вызывается метод, выход из которого свидетельствует об утрате лидерства.
Блокировки
Блокировки — один из важнейших механизмов распределенной межпроцессной синхронизации. Curator предоставляет широкий набор объектов блокировок:
- Shared Reentrant Lock — распределенная блокировка, в которую может повторно входить клиент, который имеет к ней доступ;
- Shared Lock — распределенная блокировка;
- Shared Reentrant Read Write Lock — объект, который позволяет осуществлять раздельную блокировку на чтение и на запись, при этом заблокировать объект на чтение могут несколько клиентов одновременно, блокировка на запись является эксклюзивной;
- Shared Semaphore — считающий семафор, с помощью которого легко осуществить работу с ограниченным количеством ресурсов, которое задается 32-битным целым числом;
- Multi Shared Lock — высокоуровневый объект, который позволяет выполнять операции над несколькими распределенными блокировками атомарно.
Барьеры
- Barrier — объект, который позволяет некоторому клиенту заблокировать доступ к участку кода для остальных участников до выполнения определенных условий, а при их наступлении — разблокировать доступ, что приводит к тому, что все участники могут продолжить свое исполнение;
- Double Barrier — объект позволяет синхронизовать вход некоторого количества клиентов в сегмент кода и их выход из него.
Счетчики
- Shared Counter — обычный целочисленный счетчик (32 bit) с защитой от гонки;
- Distributed Atomic Long — счетчик типа Long (64 bit).
Кэши
- Path Cache — объект, который наблюдает за узлом и обновляет локальный кэш о его дочерних узлах и опционально об их данных при его изменении;
- Node Cache — объект, который наблюдает за узлом и обновляет локальный кэш о нем и его данных;
- Tree Cache — объект, который наблюдает за всем деревом потомков узла и обновляет локальный кэш при изменении в дереве;
Узлы
- Persistent Node — данный рецепт позволяет создать узел с данными, для которого Curator будет стремиться обеспечить его присутствие и неизменность, даже при внешних воздействиях;
- Persistent TTL Node — рецепт для создания узла, время жизни которого определяется TTL, который поддерживает те же свойства, что и Persistent Node;
- Group Member — позволяет организовать группу участников.
Очереди
Хочу заметить, что Zookeeper — не лучший кандидат для организации интенсивных распределенных очередей, если требуется обеспечить пропуск большого количества сообщений, то рекомендую воспользоваться специально предназначенным решением, например, Apache Kafka, RabbitMQ или другими. Тем не менее, Curator предоставляет набор рецептов для поддержки очередей:
- Distributed Queue — обычная распределенная очередь, позволяет класть и извлекать сообщения в порядке очередности;
- Distributed Id Queue — распределенная очередь, которая с каждым сообщением сохраняет идентификатор и позволяет извлечь сообщение из очереди по идентификатору с его немедленным удалением;
- Distributed Priority Queue — очередь с приоритетами;
- Distributed Delay Queue — очередь позволяет задать для каждого добавляемого элемента время, в формате Unixtime, когда он станет доступен для чтения из очереди;
- Simple Distributed Queue — аналог очереди, которая предоставляется стандартным API Zookeeper.
Заключение
Библиотека Apache Curator безусловно стоит того, чтобы рассмотреть ее к применению, она является выдающимся образцом инженерного труда и позволяет значительно упростить взаимодействие с Apache Zookeeper. К недостаткам библиотеки можно отнести малый объем документации, что повышает входной барьер для начинающих разработчиков. В своей практике мне не раз требовалось изучать исходные коды библиотеки, чтобы понять как именно работает тот или иной рецепт. Однако, это дает и положительный эффект — глубокое понимание реализации позволяет совершать меньше логических ошибок, основанных на предположениях.
Необходимо отметить, что разработчики Curator рекомендуют изучить документацию Zookeeper до того, как начать использовать библиотеку. Это очень разумный совет, поскольку Zookeeper является продуктом, для эффективного использования которого необходимо понимать как именно он функционирует, а не только знать его API. Эти затраты безусловно окупятся, а в руках опытного инженера возможности Zookeeper позволяет создавать надежные и производительные распределенные системы.
Комментарии (14)
ivankudryavtsev
02.08.2017 10:29+1К сожалению, я не знаком детально с etcd. Мне кажется, что etcd — это больше про конфигурацию и discovery, в этом смысле его разумнее сравнивать с consul, а zookeeper, в контексте данной статьи, — про координацию.
Кроме того, согласно статье он только в последних версиях начал поддерживать DLM, что требует проведения бенчмарков, как минимум для того, чтобы объективно выбрать решение. Многие вещи можно и на in-memory grid делать — на том же Hazelcast, Ignite, да хоть на MySQL, но здесь речь про Apache Zookeeper.
Честно говоря, не думаю, что все бросятся переходить на etcd. Причин несколько, одна из существенных — если у вас развернута инфраструктура, в которой уже есть Zookeeper, а таких инфраструктур много, то проще продолжать использовать его, нежели переходить на другую систему.g0dlike
02.08.2017 10:49+1Собственно, как заказывали :)
https://coreos.com/blog/performance-of-etcd.html
Вообще, моек мнение было — выбор сервиса координации с точки зрения operations.
мейнтейнить ZK не очень удобно, exhibitor мертв, да и вообще, судя по темпу развития 3.5 ветки, проект в стагнации.ivankudryavtsev
02.08.2017 10:54+1Все верно, но это не производительность для целей координации, а для конфигурации. Видел этот обзор. Я же говорю, меня Zookeeper для целей конфигурации меньше интересует, чем возможности для координации. Об этом статья и написана.
Еще раз, я не говорю, что Etcd плохой, а Zookeeper хороший.
AstarothAst
02.08.2017 11:32Я не совсем понял, есть ли у ZooKeeper'а возможность разворачиваться не отдельным сервером, а в embedded виде? Ну, то есть пишу я приложение, и хочу иметь возможность обновляться без остановки оного. Поднимаю два экземпляра, у каждого внутри стартован экземпляр zookeeper'а, эти экземпляры здороваются, и ноды могут друг про друга все узнавать — умерла одна, вторая начала процессить запросы, первая обновилась, сказала «я главная», поменялись местами. Или я хочу странного?
ivankudryavtsev
02.08.2017 11:49Лучший ответ на ваш вопрос вот здесь — серверы должны знать о друг друге заранее.
То, что Вы хотите, можно реализовать на Hazelcast, к примеру, там есть динамическое присоединение к кластеру.
IvanPonomarev
02.08.2017 23:48Спасибо за статью. Скажите, а в какую сторону Вы бы смотрели, решая вот такую задачу, подойдёт ли тут ZooKeeper?
В сети на разных серверах есть N совершенно одинаковых исполнителей (сейчас N около 150). Во всякий момент времени есть список задач (коих всегда разное количество, но меньше, чем N). Надо, чтобы исполнители «разобрали на себя» задачи из списка, т. е. каждой из задач соответствовал бы свой исполнитель. Исполнитель не может взять больше одной задачи. Одну задачу потенциально могут взять два исполнителя, но это нежелательно. Исполнители могут как «отваливаться» по той или иной причине, так и приходить новые в пул, и надо чтобы как можно быстрее задачу у «отвалившегося» исполнителя перехватывал другой исполнитель, т. е. ни одной задачи без исполнителя оставлять нельзя.
Вот сейчас на ключах с TTL в Redis-е у меня реализован самодельный алгоритм. Но (на то он и самодельный) по нему случается, что за задачу «хватается» больше одного исполнителя.
Насколько подобная задача стандартна, чем её лучше всего решать?grossws
03.08.2017 00:12Как вариант, посмотреть на Apache Kafka, задачи распределяются внутри consumer group, т. е. одно сообщение в рамках одной consumer group получит только один consumer.
IvanPonomarev
03.08.2017 00:30Неет, очередь в данном случае не годится, потому что наши задачи имеют другую природу: они протяжённы во времени. Мы имеем список задач, этот список со временем меняется, каждая задача в нём висит, скажем, от получаса до двух часов. И нам надо обеспечить, чтобы пока задача в списке, на неё был назначен исполнитель из пула. Если задача из списка удаляется — исполнитель освобождается. Если исполнитель отваливается — надо назначать другого как можно быстрее.
grossws
03.08.2017 01:09Неправильно вас понял.
Вполне можно реализовать что-то типа выбора мастера, который снимает свою кандидатуру с других задач как только берется за одну. А когда освобождается — вешает кандидатуру на все имеющиеся задачи.
ivankudryavtsev
03.08.2017 06:45+1Добрый день. Это можно сделать на Zookeeper вполне. В данном случае, задачи будут последовательными узлами, а исполнители — эфемерными дочерними узлами (к примеру). Назначение на задачу производится с защитой Distributed Shared Lock. Наличие новых задач проверяется подпиской на родительский узел задач и(или) поллингом.
lock tasts/ t00000001 executorX t00000002 executorY
IvanPonomarev
03.08.2017 10:15Вот да, спасибо! Похоже на то, что надо.
Но непонятно вот что: по какому критерию Zookeeper понимает, что эфемерный узел пора рубить? В доках пишут: «пока существует пользовательская сессия». Так сессия как таковая может существовать вполне себе долго после того, как процедура обработки, её создавшая, вылетела (если не успела закрыть, например). Zookeeper выбивает сессию по таймауту обращений? А если мы долго заняты обработкой и не обращаемся к Zookeeper? Или открытая сессия шлёт какой-то heartbeat? Опять же если мы её не закроем и потеряем, тогда она его будет слать, пока garbage collector не доберётся? Объясните))IvanPonomarev
03.08.2017 10:20Наверное вместо эфемерной ноды лучше создать ноду с TTL и «освежать» TTL в цикле обработки? Но я просто не понимаю механику работы эфемерных нод, а это же одна из основных «фишек» Zookeper.
ivankudryavtsev
03.08.2017 10:44Можно и так, это зависит от природы обработчиков, если, к примеру, они стартуют в Mesos как одноразовые docker-контейнеры, то при крэше сессия завершится и узел удалится.
g0dlike
В новых проектах не вижу смысла в использовании ZK, когда есть etcd.
Когда допилят zetcd, ZK станет не нужным и в старых проектах.