Прим. переводчика: автор статьи рассказывает о процессе обновления кластера Elasticsearch размером более 3 петабайт методом последовательного включения двух кластеров, а также о том, как решались проблемы согласованности индексирования и миграции данных.
Это вторая часть из серии статей об обновлении кластера Elasticsearch без простоев и с минимальным воздействием на пользователей.
Как упоминалось в первой части, необходимо было обеспечить плавный переход между двумя версиями системы, при этом сохраняя возможность отката.
Поэтому с самого начала было очевидно, что придется запускать параллельно пару кластеров Elasticsearch, а затем постепенно переходить со старого на новый. В этой части пойдет речь о том, как решались проблемы согласованности индексирования и миграции данных.
Всё меняется
Требования к нашей системе довольно необычны по сравнению с типичным кластером Elasticsearch, собирающим логи, поскольку в ней попросту отсутствуют неизменяемые данные. Любой документ в любом индексе в любой момент времени может быть обновлен. Поэтому приходится внимательно следить за обновлениями и всегда применять их в правильном порядке.
Каждую секунду кластер получает 5000 новых документов и более 10000 обновлений. Всего в нем более 400 млрд документов, которые, вместе с репликами, «весят» более 3,5 ПБ.
Управлять этой горой данных очень непросто, даже если речь идет всего об одном кластере. К счастью, в Elasticsearch есть оптимистическая блокировка — и, смею вас заверить, ее мы использовали по полной. До начала миграции на новый кластер особых проблем с согласованностью в старом кластере не было (по крайней мере, мы ничего о них не знали).
Сценарии работы с обновлениями документов
Документы в нашей системе обновляются по ряду причин. Три распространенных примера: новая активность (например, лайк или комментарий к сообщению в соцсети), аннотации к документу, сделанные клиентом (например, метка или комментарий, прикрепленный к документу) или нормативные требования (приходится скрывать контент от поставщиков или удалять его на основании определенных правил).
Справиться со всем этим нашей системе помогают четыре базовые операции над документами и полями. Все они применяются к заданному идентификатору документа, а идентификаторы гарантированно уникальны для всего кластера.
-
Upsert
Добавить новый документ или полностью заменить существующий.
Обратите внимание, что удаления также проводятся как Upsert'ы. То есть Upsert просто заменяет существующий документ «пустым».
-
AddIfNotExists
Добавить новый документ, если его еще нет. В противном случае сохранить существующий и отбросить новый.
-
SetField
Заменить значение одного поля в документе, сохранив остальные поля нетронутыми. Выдать ошибку, если документ не существует.
-
AppendToField / RemoveFromField
Для полей массива также поддерживается функция добавления/удаления на уровне полей. Она удобнее SetField при работе с наборами значений. Во всех остальных аспектах семантика ее операций аналогична SetField.
До обновления для всех этих операций брался существующий документ из Elasticsearch, в него вносились изменения, после чего он сохранялся в кластер. За синхронизацию и версионирование этих операций отвечала сама Elasticsearch. Следование правилам оптимистической блокировки более или менее защищало нас от потери данных и/или другого непредсказуемого поведения. Но, как и ожидалось, вся эта идиллия закончилась с добавлением второго кластера.
Добавление второго кластера
Добавление в архитектуру второго кластера повлекло за собой массу новых проблем. Любой пропуск обновления или, что хуже, его потеря незамедлительно ударили бы по клиентам.
Одна из задач проекта миграции состояла в том, чтобы получить возможность перебрасывать поисковые запросы между кластерами незаметно для клиентов. То есть все полученные операции с документами должны были применяться в обоих кластерах в одном и том же порядке. Потеря операции в любом из кластеров сильно бы осложнила жизнь пользователям: например, тег, добавленный к записи, попросту исчезал бы для всех запросов, направляемых в этот кластер. При наших объемах даже маловероятная ошибка (к примеру, один шанс на миллион) случалась бы с завидным постоянством.
Также предстояло решить распространенную проблему, когда две операции применяются к одному и тому же документу почти одновременно. К примеру, за Upsert'ом сразу следует SetField. В такой ситуации неосторожное обращение грозит полной потерей данных. Если эти операции применить в кластерах в разном порядке, получится совершенно разный результат.
Требовалось сделать так, чтобы в конечном итоге оба кластера сходились к одному конечному результату при любой последовательности операций, даже если те применялись в разном порядке.
Непростая задача.
Архитектура, «заточенная» под согласованность
Почти сразу пришло понимание, что необходимо ограничить количество компонентов с прямым доступом к Elasticsearch. В исходном состоянии миграция на новый кластер была бы слишком сложной и требовала бы абсолютной координации из-за одномоментности переключения. Было решено остановиться на двух таких компонентах: один — для пакетной потоковой передачи данных в больших объемах и один — с REST API.
Главное, что предстояло сделать — убрать версионирование из Elasticsearch, поскольку было бы очень сложно обеспечить применение каждого отдельного обновления в одном и том же порядке в обоих кластерах. Также было решено отказаться от поддержки всех описанных выше операций в компонентах, которые взаимодействуют с Elasticsearch, а вместо этого реализовать две операции «низкого уровня»:
-
GetDocument
Извлекает документ и номер его версии по идентификатору documentId. Возвращаемую здесь версию необходимо передать UpsertIfLatest.
-
UpsertIfLatest
Делает Upsert для документа, если версия определена как последняя.
Если версия не новее существующей, выдать ошибку и сообщить об этом вызывающей стороне.
Например, высокоуровневая операция SetField может быть реализована так:
fun SetField(String field, String newValue, String docId) {
var success = false
while (!success) {
val document = GetDocument(docId)
document[field] = newValue
document.version = GetNextVersion(document.version)
success = UpsertIfLatest(docId, document)
}
}
Это означает, что пришлось внедрить сервис документов (Document), который использовал компоненты хранилища для реализации API высокого уровня (AddIfNotExists / SetField / AppendToField / RemoveFromField). Сервис документов создает версионированные Upsert'ы, которые дублируются в компоненты хранилища, работающие с отдельными кластерами. Компоненты хранилища независимо для каждого кластера решают, последняя ли версия у входящей операции Upsert или нет.
Дополнительная сложность состояла в том, что разные версии Elasticsearch отличались друг от друга реализацией внутреннего версионирования и оптимистической блокировки. Поэтому пришлось изобрести третью, единую причинно-согласованную схему версионирования на основе порядкового номера, работающую поверх двух других. Порядковые номера, которые управлялись «вне» кластеров, хранились в нескольких полях версий всех документов.
Не будем вдаваться в подробности, но команде из четырех человек потребовалось более года, чтобы аккуратно и постепенно перестроить весь пайплайн индексирования и обновления. В итоге получилась такая схема:
Преимущества архитектуры, «заточенной» под согласованность
Вся работа по поддержке двух кластеров была не просто «неизбежным злом», связанным с обновлением. Она принесла пользу гораздо быстрее, чем предполагалось. Вот краткий список дополнительных преимуществ, которые удалось получить:
Улучшены автоматические наборы тестов и мониторинг согласованности.
Улучшено разделение ответственности между несколькими компонентами, которые до этого были слишком сильно переплетены между собой.
Обнаружен и исправлен ряд ошибок, связанных с пограничными кейсами и возникавших в старой кластерной архитектуре: в данных были обнаружены свидетельства некоторых известных ошибок согласованности, порожденных самой Elasticsearch, о которых ранее не было известно.
-
Архитектуру «двух кластеров» можно использовать и в будущем:
например, для новых обновлений кластера;
на ее основе можно организовать еще один кластер в другой географической точке, чтобы обеспечить отказоустойчивость и локальность данных или снизить задержки.
Миграция данных между кластерами
Параллельно с проектированием и реализацией заточенной под согласованность архитектуры стояла задача понять, как именно копировать данные из старого кластера в новый.
Мы рассмотрели множество различных вариантов, от внедрения новых компонентов, которые считывали бы снапшоты Elasticsearch и записывали их на новый кластер, до индексации с удаленного сервера. В итоге было решено экспортировать все данные (1 ПБ) с помощью scan & scroll API и сохранять сжатые пакеты JSON-файлов в хранилище S3. После этого достаточно было пропустить эти JSON-документы через основной пайплайн.
Так мы можем не только реплицировать данные, но и одновременно улучшать их. Например, удалить кастомный код, который обрабатывал старые данные, и добавить поля, введенные с момента индексации. В результате получилась более последовательная, тонкая и понятная информационная модель для нового кластера, что само по себе огромное улучшение.
По нашим оценкам, пайплайн экспорта, переработки и реиндексации смог бы обрабатывать около 50 тыс. документов в секунду. Таким образом, работа по заполнению нового кластера должна была занять не менее 3 месяцев при условии, что процесс не будет прерываться.
Поскольку копирование заняло бы так много времени, было решено перестроить архитектуру остальной части системы на использование «частичных» данных, которые постепенно появляются в новом кластере. Это позволило бы избежать глобального переключения в самом конце. Такое решение повлияло на дальнейшие планы и концепцию и в итоге пришлось весьма кстати (подробнее об этом в одной из следующих статей).
Пиковая пропускная способность индексирования во время миграции составила 130 тыс. документов в секунду. Но, как видно из рисунка 5, по разным причинам ее не удавалось поддерживать длительное время. Кроме того, индексирование пришлось дополнительно замедлить после того, как основная часть запросов конечных пользователей начала направляться в новый кластер (чтобы не рисковать ростом поисковых задержек).
Стоит также отметить, что оценка средней пропускной способности оказалась очень точной. Весь процесс развертывания занял ~ 110 дней с момента начала индексирования до момента, когда все данные оказались на новом кластере.
На этом заканчивается вторая часть серии материалов об обновлении кластера Elasticsearch. Следите за обновлениями: очередная статья будет опубликована на следующей неделе.
P.S.
Читайте также в нашем блоге: