Прим. переводчика: автор статьи рассказывает о процессе обновления кластера Elasticsearch размером более 3 петабайт методом последовательного включения двух кластеров, а также о том, как решались проблемы согласованности индексирования и миграции данных.

Это вторая часть из серии статей об обновлении кластера Elasticsearch без простоев и с минимальным воздействием на пользователей.

Изображение из оригинальной статьи
Изображение из оригинальной статьи

Как упоминалось в первой части, необходимо было обеспечить плавный переход между двумя версиями системы, при этом сохраняя возможность отката.

Поэтому с самого начала было очевидно, что придется запускать параллельно пару кластеров Elasticsearch, а затем постепенно переходить со старого на новый. В этой части пойдет речь о том, как решались проблемы согласованности индексирования и миграции данных.

Всё меняется

Требования к нашей системе довольно необычны по сравнению с типичным кластером Elasticsearch, собирающим логи, поскольку в ней попросту отсутствуют неизменяемые данные. Любой документ в любом индексе в любой момент времени может быть обновлен. Поэтому приходится внимательно следить за обновлениями и всегда применять их в правильном порядке.

Каждую секунду кластер получает 5000 новых документов и более 10000 обновлений. Всего в нем более 400 млрд документов, которые, вместе с репликами, «весят» более 3,5 ПБ.

Управлять этой горой данных очень непросто, даже если речь идет всего об одном кластере. К счастью, в Elasticsearch есть оптимистическая блокировка — и, смею вас заверить, ее мы использовали по полной. До начала миграции на новый кластер особых проблем с согласованностью в старом кластере не было (по крайней мере, мы ничего о них не знали).

Сценарии работы с обновлениями документов

Документы в нашей системе обновляются по ряду причин. Три распространенных примера: новая активность (например, лайк или комментарий к сообщению в соцсети), аннотации к документу, сделанные клиентом (например, метка или комментарий, прикрепленный к документу) или нормативные требования (приходится скрывать контент от поставщиков или удалять его на основании определенных правил).

Рисунок 1. Статистика по правкам индекса за ~3 недели (число событий в секунду)
Рисунок 1. Статистика по правкам индекса за ~3 недели (число событий в секунду)

Справиться со всем этим нашей системе помогают четыре базовые операции над документами и полями. Все они применяются к заданному идентификатору документа, а идентификаторы гарантированно уникальны для всего кластера.

  • Upsert

    • Добавить новый документ или полностью заменить существующий.

    • Обратите внимание, что удаления также проводятся как Upsert'ы. То есть Upsert просто заменяет существующий документ «пустым».

  • AddIfNotExists

    • Добавить новый документ, если его еще нет. В противном случае сохранить существующий и отбросить новый.

  • SetField

    • Заменить значение одного поля в документе, сохранив остальные поля нетронутыми. Выдать ошибку, если документ не существует.

  • AppendToField / RemoveFromField

    • Для полей массива также поддерживается функция добавления/удаления на уровне полей. Она удобнее SetField при работе с наборами значений. Во всех остальных аспектах семантика ее операций аналогична SetField.

До обновления для всех этих операций брался существующий документ из Elasticsearch, в него вносились изменения, после чего он сохранялся в кластер. За синхронизацию и версионирование этих операций отвечала сама Elasticsearch. Следование правилам оптимистической блокировки более или менее защищало нас от потери данных и/или другого непредсказуемого поведения. Но, как и ожидалось, вся эта идиллия закончилась с добавлением второго кластера.

Рисунок 2. Сервисы, которые реализуют четыре базовых операции
Рисунок 2. Сервисы, которые реализуют четыре базовых операции

Добавление второго кластера

Добавление в архитектуру второго кластера повлекло за собой массу новых проблем. Любой пропуск обновления или, что хуже, его потеря незамедлительно ударили бы по клиентам.

Одна из задач проекта миграции состояла в том, чтобы получить возможность перебрасывать поисковые запросы между кластерами незаметно для клиентов. То есть все полученные операции с документами должны были применяться в обоих кластерах в одном и том же порядке. Потеря операции в любом из кластеров сильно бы осложнила жизнь пользователям: например, тег, добавленный к записи, попросту исчезал бы для всех запросов, направляемых в этот кластер. При наших объемах даже маловероятная ошибка (к примеру, один шанс на миллион) случалась бы с завидным постоянством.

Также предстояло решить распространенную проблему, когда две операции применяются к одному и тому же документу почти одновременно. К примеру, за Upsert'ом сразу следует SetField. В такой ситуации неосторожное обращение грозит полной потерей данных. Если эти операции применить в кластерах в разном порядке, получится совершенно разный результат.

Рисунок 3. Обновления проходят в неправильном порядке, из-за чего обновление с версией 1 
теряется в итоговом результате
Рисунок 3. Обновления проходят в неправильном порядке, из-за чего обновление с версией 1 теряется в итоговом результате

Требовалось сделать так, чтобы в конечном итоге оба кластера сходились к одному конечному результату при любой последовательности операций, даже если те применялись в разном порядке.

Непростая задача.

Архитектура, «заточенная» под согласованность

Почти сразу пришло понимание, что необходимо ограничить количество компонентов с прямым доступом к Elasticsearch. В исходном состоянии миграция на новый кластер была бы слишком сложной и требовала бы абсолютной координации из-за одномоментности переключения. Было решено остановиться на двух таких компонентах: один — для пакетной потоковой передачи данных в больших объемах и один — с REST API.

Главное, что предстояло сделать — убрать версионирование из Elasticsearch, поскольку было бы очень сложно обеспечить применение каждого отдельного обновления в одном и том же порядке в обоих кластерах. Также было решено отказаться от поддержки всех описанных выше операций в компонентах, которые взаимодействуют с Elasticsearch, а вместо этого реализовать две операции «низкого уровня»:

  • GetDocument

    • Извлекает документ и номер его версии по идентификатору documentId. Возвращаемую здесь версию необходимо передать UpsertIfLatest.

  • UpsertIfLatest

    • Делает Upsert для документа, если версия определена как последняя.

    • Если версия не новее существующей, выдать ошибку и сообщить об этом вызывающей стороне.

Например, высокоуровневая операция SetField может быть реализована так:

fun SetField(String field, String newValue, String docId) {
  var success = false
  while (!success) {
    val document = GetDocument(docId)
    document[field] = newValue
    document.version = GetNextVersion(document.version)
    success = UpsertIfLatest(docId, document)
  }
}

Это означает, что пришлось внедрить сервис документов (Document), который использовал компоненты хранилища для реализации API высокого уровня (AddIfNotExists / SetField / AppendToField / RemoveFromField). Сервис документов создает версионированные Upsert'ы, которые дублируются в компоненты хранилища, работающие с отдельными кластерами. Компоненты хранилища независимо для каждого кластера решают, последняя ли версия у входящей операции Upsert или нет.

Дополнительная сложность состояла в том, что разные версии Elasticsearch отличались друг от друга реализацией внутреннего версионирования и оптимистической блокировки. Поэтому пришлось изобрести третью, единую причинно-согласованную схему версионирования на основе порядкового номера, работающую поверх двух других. Порядковые номера, которые управлялись «вне» кластеров, хранились в нескольких полях версий всех документов.

Не будем вдаваться в подробности, но команде из четырех человек потребовалось более года, чтобы аккуратно и постепенно перестроить весь пайплайн индексирования и обновления. В итоге получилась такая схема:

Рисунок 4. Сервисы, которые реализуют четыре базовых операции в новой архитектуре
Рисунок 4. Сервисы, которые реализуют четыре базовых операции в новой архитектуре

Преимущества архитектуры, «заточенной» под согласованность

Вся работа по поддержке двух кластеров была не просто «неизбежным злом», связанным с обновлением. Она принесла пользу гораздо быстрее, чем предполагалось. Вот краткий список дополнительных преимуществ, которые удалось получить:

  • Улучшены автоматические наборы тестов и мониторинг согласованности.

  • Улучшено разделение ответственности между несколькими компонентами, которые до этого были слишком сильно переплетены между собой.

  • Обнаружен и исправлен ряд ошибок, связанных с пограничными кейсами и возникавших в старой кластерной архитектуре: в данных были обнаружены свидетельства некоторых известных ошибок согласованности, порожденных самой Elasticsearch, о которых ранее не было известно.

  • Архитектуру «двух кластеров» можно использовать и в будущем:

    • например, для новых обновлений кластера;

    • на ее основе можно организовать еще один кластер в другой географической точке, чтобы обеспечить отказоустойчивость и локальность данных или снизить задержки.

Миграция данных между кластерами

Параллельно с проектированием и реализацией заточенной под согласованность архитектуры стояла задача понять, как именно копировать данные из старого кластера в новый.

Мы рассмотрели множество различных вариантов, от внедрения новых компонентов, которые считывали бы снапшоты Elasticsearch и записывали их на новый кластер, до индексации с удаленного сервера. В итоге было решено экспортировать все данные (1 ПБ) с помощью scan & scroll API и сохранять сжатые пакеты JSON-файлов в хранилище S3. После этого достаточно было пропустить эти JSON-документы через основной пайплайн.

Так мы можем не только реплицировать данные, но и одновременно улучшать их. Например, удалить кастомный код, который обрабатывал старые данные, и добавить поля, введенные с момента индексации. В результате получилась более последовательная, тонкая и понятная информационная модель для нового кластера, что само по себе огромное улучшение.

По нашим оценкам, пайплайн экспорта, переработки и реиндексации смог бы обрабатывать около 50 тыс. документов в секунду. Таким образом, работа по заполнению нового кластера должна была занять не менее 3 месяцев при условии, что процесс не будет прерываться.

Поскольку копирование заняло бы так много времени, было решено перестроить архитектуру остальной части системы на использование «частичных» данных, которые постепенно появляются в новом кластере. Это позволило бы избежать глобального переключения в самом конце. Такое решение повлияло на дальнейшие планы и концепцию и в итоге пришлось весьма кстати (подробнее об этом в одной из следующих статей).

Рисунок 5. Статистика репликации за двухмесячный период (число документов, переносимых из старого кластера в новый за секунду)
Рисунок 5. Статистика репликации за двухмесячный период (число документов, переносимых из старого кластера в новый за секунду)

Пиковая пропускная способность индексирования во время миграции составила 130 тыс. документов в секунду. Но, как видно из рисунка 5, по разным причинам ее не удавалось поддерживать длительное время. Кроме того, индексирование пришлось дополнительно замедлить после того, как основная часть запросов конечных пользователей начала направляться в новый кластер (чтобы не рисковать ростом поисковых задержек).

Стоит также отметить, что оценка средней пропускной способности оказалась очень точной. Весь процесс развертывания занял ~ 110 дней с момента начала индексирования до момента, когда все данные оказались на новом кластере.

На этом заканчивается вторая часть серии материалов об обновлении кластера Elasticsearch. Следите за обновлениями: очередная статья будет опубликована на следующей неделе.

P.S.

Читайте также в нашем блоге:

Комментарии (0)