Черновик статьи был написан еще год назад, когда я работал на крупном международном проекте, но из-за разных событий прошлого года он остался неопубликованным.

На проекте в моем ведении находилось несколько on-premise кластеров в нескольких европейских датацентрах. «Мы» в этой статье — небольшая команда DataOps из 5 человек.

Было дело я читал на Хабре статью про «Кластер Elasticsearch на 200 ТБ+» и примерял написанное к нам, у нас такой кластер считался средним, самый маленький кластер под 0,1Ptb, а большой тогда был под 0,5Ptb. Потом была поставлена задача подготовить кластер к увеличению объемов входящих данных в 2-3 раза, а срок хранения в 2 раза, т. е. объем хранимых данных, если грубо экстраполировать, должен был стать в районе 2-3Ptb.

Хочу поделиться нашим опытом, может кому пригодится.

Требования к кластеру

  • Возможность проводить анализ инцидентов, влияния инфраструктурных изменений на предоставляемые услуги, мониторинг качества услуг;

  • Максимальная допустимая задержка в поступающих данных 15 минут;

  • При задержке в данных более 30 минут следует oncall инцидент;

  • Интерфейсы для пользователей должны остаться прежними или схожими, а именно:

    • Grafana для мониторинга;

    • Kibana (Discovery режим) для более детальных расследований.

  • Срок хранения исходных журналов — изначально 1 неделя и довести до 2 недель;

  • Срок хранения метрик — постоянно.

Характеристики входного потока

  • Данные с нескольких сотен серверов собираются и передаются на один входной сервер по SFTP. При этом интервал сбора и отправки данных 5 минут. Т.е. уже заложена задержка в 5 минут в поступающих данных и на обработку данных, обеспечение доступности их, остается 10 минут. Так же данные могут приходить с большой задержкой (например, в сутки) из-за отсутствия связи с удаленным сервером.

  • 4 млн файлов общим объемом в 3Tb (запакованные в GZIP CSV-подобные файлы) в день

  • Размер файлов от нескольких килобайт до 2Gb

  • Общий объем данных после обработки ~63Tb в день

  • Суммарная скорость индексирования сообщений в ElasticSearch на Hot датанодах не выше 800K/s

Поток обработки

У нас есть несколько потоков обработки (pipelines), которые обрабатывают входные данные и заливают их в ElasticSearch. Так же есть другие обработки, которые рассчитывают метрики, но они не связаны с ElasticSearch и поэтому я их не будут указывать.

Parsing → Kafka → Filter / Enrich → Kafka → ElasticSearch

Часть архитектуры
Часть архитектуры

Kafkabeat — это внутренее решение, написаное на Go с использованием родных библиотек от ElasticSearch и Sarama для Apache Kafka ( https://github.com/Shopify/sarama ).

Железо (34 сервера (CPU:80, RAM:384Gb) , 1 датацентр) и роли

  • 1 — Ingest

  • 8 — Hot ES (8x8 = 64 датаноды)

  • 12 — Warm ES (12x8 = 96 датанод)

  • 8 — Kafka

  • 5 — Processing (KafkaStreams)

  • 10Gb сетевые интерфейсы

Первые шаги

Архитектура кластера досталась мне в наследство as is и требовалось вносить изменения без остановки всего кластера. На момент начала планирования работ по расширению кластера он уже был перегружен и работал на максимуме возможностей этого архитектурного решения.

Поэтому первое что сделали:

  • Провели ревизию имеющегося железа

  • Измерили характеристики входного потока

  • Спрогнозировали будущие характеристики входного потока

и сели думать что делать дальше.

Варианты:

  1. Линейное увеличение железа

  2. Пересмотр процессинга

  3. Уход в облака

  4. Замена системы хранения

Вариант 1. Увеличение количества серверов

Был проведен стоимостной анализ с учетом стоимости серверов, жестких дисков, дисковых полок, стоимости аренды и обслуживания стоек в датацентрах, вендорное обслуживани и т. п. в горизонте на 10 лет.

Результат: дорого.

Но дополнительные 14 серверов мы получили для того, чтобы решить как текущие проблемы с работой кластера, так и с расчетом на изменения, которые требовалось придумать и провести

Вариант 2. Пересмотр процессинга

Была проведена большая работа по переработке процессинга, но с точки зрения ElasticSearch кластера важна только часть, касающаяся индексирования документов в ElasticSearch.

В начале нам надо было решить проблему с индексированием имеющихся объемов при имеющемся оборудовании. Как решение мы использование разделение потока на две части с учетом возраста входных данных (но можно было использовать и другие критерии). Критерий был выбран таким образом, чтобы существующий процессинг и скорость работы ElasticSearch кластера обеспечивали гарантированное индексирование данных в течение получаса после момента события, а оставшиеся данные обрабатываются в другом потоке и заносятся в ElasticSearch в течение дня.

Этот подход обеспечил наличие порядка 20-40% входных данных в праймтайм и получение всех данных с задержкой в 2-3 часа. И если раньше весь поток работал по принципу FIFO, что приводило к единому лагу для всех данных, то новое разделение на два FIFO потока позволило «приоритизировать» данные и обеспечить их оперативное наличие в конечном хранилище (т. е. принцип «хороша ложка к обеду» был соблюден — лучше иметь какие-либо данные сейчас, чем все данные, но через несколько часов)

Возраст входящих файлов
Возраст входящих файлов

Параллельно решали вопросы с тюнингом ElasticSearch и индексированием даных.

И в каких-то случаях рассматривали самые различные варианты подготовки данных и использование различных существующих инструментов для их занесения в ElasticSearch. В нашем случае Apache Kafka осталась непревзойденным механизмом для хранения данных и расспараллеливания загрузки этих данных в ElasticSearch.

Взяли Kafkabeat (внутренняя разработка), взяли расхваливаемый Vector (https://vector.dev/) и я накидал небольшую утилиту на C++ с использованием cppkafka (https://github.com/mfontanini/cppkafka) и elasticlient (https://github.com/seznam/elasticlient), rapidjson (https://github.com/Tencent/rapidjson/). Провели тестирование и получили что Kafkabeat самый медленный, Vector быстрее Kafkabeat на ~10% , а мое решение, получившее название Baikalbeat, перспективно, но требует доделки.

Замеры производительности решений Kafka -> ElasticSearch
Замеры производительности решений Kafka -> ElasticSearch

Пришлось глубже залезть в нутро этих механизмов и в следующих версиях, отказаться от: парсинга JSON, библиотек для ElasticSearch. И добавить несколько потоков. Парсинг JSON нужен был для того, чтобы из kafka-сообщений извлекать название ElasticSearch-индекса для вставки данных, но решил заменить это на поиск по подстроке. Библиотеку работы с kafka не стал трогать, так как к ней никаких претензий не было — тестовый миллион записей выкачивался за 9 секунд. А библиотеку для ElasticSearch заменил на небольшую имплементацию ElasticSearch Bulk API с POST запросами.

В результате тестов выяснили: Baikalbeat при пачках в 50K сообщений может легко прибить тестовые инстансы ElasticSearch, при пачках в 25-35K наибольшая производительность.

Когда мы заменили все наши инстансы Kafkabeat-ов, на Baikalbeat, то сразу получили требуемую скорость индексирования в 1500-1600K/s. Т.е. мы добились скорости индексирования данные в режиме реального времени и практически полное отсутствие лага на индексировании.

Лаг (разница между временем события и временем попаданием в ES) данных в ElasticSearch
Лаг (разница между временем события и временем попаданием в ES) данных в ElasticSearch

(Данные обычно к нам приходят с минимальной задержкой в 5 и более минут от момента события)

Скорость индексирования ElasticSearch (документов в секунду)
Скорость индексирования ElasticSearch (документов в секунду)

Вариант 3. Уход в облака

В ходе анализа облаков изучили разные варианты, провели консультирование с разными специалистами и вывод получился один: на текущий момент для нас альтернативы нет.

Что изучили более-менее детально:

  • Google Cloud

  • Microsoft Azure

  • Amazon AWS (включая managed ElasticSearch)

  • ElasticSearch Cloud

  • Yandex Cloud (включая Clickhouse, см. вариант 4)

Какие минусы:

  • Альтернативы Kibana Discovery режиму пока нет

  • Облака не подходят для хранения и обработки требуемых нам объемов данных в ElasticSearch

  • Все облачные ElasticSearch расчитаны на «средние» кластера и они хуже в управляемости, чем наши bare metal.

  • Стоимость решения эквивалентна или выше (sic!), чем просто покупка железа (вариант 1).

Вариант 4. Замена системы хранения

Мы изучили несколько вариантов:

  • Другие системы хранения и обработки данных и остановились на Clickhouse;

  • Другие системы хранения данных (NetApp).

Hidden text

Offtopic

Clickhouse — на мой взгляд чудесная система, создаваемая гениальными троешниками, которые создали СУБД мало зная про СУБД. (Ребята, если вы будете читать мою статью, то вы создали действительно хорошую систему, но за некоторые ее выкрутасы и архитектурные решения, особенности обновления и т.п. хотелось ее сразу выкинуть и забыть, так как детские проблемы из-за отсутствия опыта работы и знания крупных СУБД, а вот уши MySQL там торчали везде)

Clickhouse мы опробовали на части наших данных и он показал хорошую производительность, быстрые ответы на запросы, изумительный объем хранения данных (в ~6 меньше,чем ElasticSearch). Но затем начались сложности:

  • Как спланировать размер кластера с учетом нашей нагрузки. Пришлось делать замеры и свою методику расчета.

  • Как решать вопросы стабильности работы с kafka (решили)

  • Как решать вопросы стабильной работы TTL (решили)

  • Как восстанавливать после сбоев — тут всего не решили, опять же порой восстановление кластера через Zookeeper напоминало танцы с бубном.

  • Документации по сопровождению и решению проблем — практически нет.

  • Нет нормального визуального интерфейса, разве только Grafana. Всякие Tabix, Metabase, Redash — сыро и малопригодно для инженеров из других специализаций. Думал что хотя бы Yandex DataLens спасет — нет, там тоже многое печально и другая цель в визуализации информации.

Большое спасибо тем ребятам из Яндекс, кто общался со мной по вопросам создания Clickhouse кластера на 0,5-1Ptb, консультировал по DataLens.

В результате Clickhouse мы используем, но только как backend для graphite.

Про другие системы хранения. Посмотрели на SAS, SSD, NVME носители, NetApp. Нам предоставили тестовые хранилища на разных носителях — провели замеры для оценки производительности, а затем стали думать как вписаться в бюджет.

В результате остановились на NetApp хранилище с гибридной реализацией в объеме на >1Ptb для начала.

Реорганизация кластера

Получили мы дополнительные 14 серверов (CPU: 96, RAM: 1,5Tb) и NetApp для хранения данных. NetApp подключен ко всем 48 серверам.

Шаги:

  1. Подготовка Ansible ролей и плейбуков

  2. Первоначальная настройка серверов

  3. Настройка сервисов и создание нового кластера (Kafka, processing, ElasticSearch)

  4. Перенастройка процессинга для перенаправления части пайплайнов в новый кластер

  5. Мониторинг

Запустили новый кластер на части серверов (3-5) и стали изучать его работу. В начале в новом кластере подняли весь процессинг, а потом запустили туда копирование данных. В начале это сделали через процессинг на входном сервера, а потом улучшили схему: подключили NetApp через NFS на входной сервер, стали туда складывать файлы, информацию о файла в Kafka topic, затем с помощью kafkamirror копируем данные в новый кластер новую Kafka. В результате новый кластер начал получать данные без серьезной нагрузки на старый кластер.

Дальнейшие шаги должны были быть такими:

  1. Мониторинг в течение 1 недели;

  2. Отключение процессинга данных в старом кластере;

  3. Подготовка нового пайплайна, включение его и мониторинга в новом кластере;

  4. Перераспределение ролей серверов для добавления дополнительных Kafka / ElasticSearch нод, если потребуется.

Но пришлось внести коррективы, так как после переноса еще пары пайплайнов мы столкнулись с деградацией скорости работы всего нового кластера:

  • Kafka стала медленнее работать;

  • ElasticSearch стал медленнее проводить индексацию и остальные операции;

  • Включение дополнительных серверов в Kafka / ElasticSearch кластеры лишь усугубили ситуацию.

Стали изучать ситуацию и выяснили что мы вышли на пороговые значения производительности NetApp хранилища. Родного мониторинга NetApp у нас нет, поэтому сделали запрос к вендору и от него получили отчет по почти 100% утилизации NetApp. Тем самым мы должны были пересмотреть архитектуру кластеров.

Провели анализ использования дисковых операций и выяснили что наиболее прожорливым оказался ElasticSearch (я вот предполагал что это будет Kafka). В качестве решения проблемы мы перенастроили 2-3 старых сервера на SAS винтах в качестве Hot датанод для ElasticSearch, а ElasticSearch датаноды на новых серверах с NetApp в качестве хранилища стали Warm. И проблема с утилизацией NetApp сразу исчезла — утилизация снизилась до 30-40%. На удивление производительность датанод на SAS оказалась не сильно меньше производительности датанод на SSD.

Рекомендации

  • Внимательно читать документацию на EleastiSearch, документация них настолько хороша (https://www.elastic.co/guide/en/elasticsearch/reference/master/tune-for-indexing-speed.html), что толку от платной поддержки особо нет.

  • Разделять физически или ограничивать лимитами датаноды на использование ресурсов, оптимальнее всего использовать docker контейнеры (в чистом виде или в k8s), к сожалению, ElasticSearch не может использовать эффективно большие объемы памяти.

  • Для максимальной скорости индексации желательно:

    • Использовать роли Hot, Warm в рамках ILM

    • Под Hot выделять наиболее быстрые хранилища, причем следует учитывать что размер этих хранилищ следует рассчитывать исходя из требуемой скорости индексирования (у нас это 30-35K/s на датаноду), количества датанод и времени хранения данных в стадии Hot (в нашем случае это 1 день), т. е.:

      • Общий объем хранилища для N часов = Размер записи * Количество записей за период N часов * K1

      • Количество датанод = Требуемая производительность / Производительность одной датаноды

      • Объем хранилища для одной датаноды = Общий объем хранилища для N часов / Количество датанод

      • K1 — коэффициент для учета watermark-ов, времени миграции данных с Hot датанод на Warm датаноды. Рекомендую использовать равным 1.2, но если у вас большой поток и пока первый индекс переливается, то успеваете получить новые данные в схожем объеме, то используйте коэффициент 2-2.2

    • Индексы разделять на шарды по количеству Hot-датанод, следует так же не забывать про параметры total_shards_per_node (> 1), чтобы при выходе из строя 1-2 датанод не положить весь процессинг.

    • Исключить реплицирование в Hot индексах

    • Использовать ILM для мигрирования индексов на Warm / Cold датаноды без переиндексирования, но с созданием реплик при необходимости (в нашем случае на Netapp наличие реплик не требуется)

    • Рекомендации для индексов:

      • использовать mappings

      • установить refresh_interval в достаточное для работы значение (у нас это сейчас 300s)

      • уменьшить количество скидываний translog ( translog.sync_interval в минуты)

    • При больших объемах данных не следует забыть о параметре max_shards_per_node в cluster/settings

    • Так же будет проблема в ElasticSearch с обработкой большого количества входящих запросов, следовательно надо внести правки для pool_threads

  • Для оперативного мониторинга достаточно использовать несколько характеристик:

    • статус кластера и время пребывания в этих статусах

    • количество датанод

    • количество мастернод

    • количество ILM ошибок

    • индексы с размером более 50Гб

    • датаноды с наибольшим заполнением хранилища

  • Для расследований и оптимизации желательно мониторить почти все характеристики как ElasticSearch, так и хостов: от скорости индексирования, состояния каждой датаноды, использования памяти, процессора и т. д. Встроенный мониторинг в ElasticSearch порой не дает достаточной информации.

Заключение

Большое спасибо всем ребятам, с которым я сопровождал этот и другие кластеры:

Дмитрий (за настройку всего и решение всевозможных проблем), Стас (за разработку и модернизацию процессинга), Николай (за разработку и модернизацию мониторинга, процессинга), Михаил (за тестирование, отладку Baikalbeat и нового кластера в целом).

Комментарии (11)


  1. gonchik
    00.00.0000 00:00

    Спасибо большое за статью - прочёл одном дыхании.

    Подскажите, правильно ли я понял, что в итоге ушли от kafkabeat на Baikalbeat, подскажите в чем их разница реализации. Ведь оба были внутренние реализации? Что не дало покрутить kafkabeat ?

    Есть ли у вас в планах раскрытие побольше деталей по тюнингу elastic search, ситуации с высокой утилизацией дисковых операции и перевод на sas в hot, а также реализации hot/warm. Заранее благодарен.


    1. rozmysl Автор
      00.00.0000 00:00

      1) Baikalbeat (https://github.com/elabpro/baikalbeat/) не совсем корректное название, так как он не beat, а узко специализированная утилита, созданная с определенными допущениями. Kafkabeat - полноценный beat. Экономить приходилось на всем - от конкатенации строк до выкидывания JSON парсинга. За счет этого и получили нужные скорости с меньшим потреблением ресурсов

      2) SAS ноды у нас легко были переведены в HOT - там ничего не тюнилось специально под это. Изначально при создании нод делался уже максимальнный тюнинг: RAID0 для создания быстрых объемных разделов, оптимизация файловой системы (xfs - журнализация), ограничение каждой ноды объемом в 32Gb RAM, тюнинг jvm и остального всего, до чего можно добраться на своем сервере.


  1. sergrok
    00.00.0000 00:00

    Почему решили использовать shared SAN NetApp? Это же единая точка отказа, хоть и имеет резервирование на уровне контроллеров. И оно особо не масштабируется горизонтально в дальнейшем. Почему не использовать локальное диски разных видов (warm,hot) в каждом сервере?

    Не рассматривали вариант с поместить ElasticSearch в k8s кластер на этом же железе?


    1. rozmysl Автор
      00.00.0000 00:00

      1) у меня был хороший Netapp с хорошей гарантией, и нужный объем в несколько петабайт мне могли обеспечить. Но мне и не надо было много петабайт для дальнейшего роста, там уже лучше было тратить деньги на адаптацию инструментов и переход на Clickhouse подобные решения.

      2) По ES на k8s - 2-3 раза обдумывали это решение и не нашли ему применения. Мало того, что сам k8s съедает часть ресурсов, так и его использование усложняет процесс troubleshooting-а. Его лучше использовать там, где требуется запускать множество разнородных процессов с динамическим распределением серверных ресурсов. В таких местах там как раз мы использовали кубер.


    1. rozmysl Автор
      00.00.0000 00:00

      3) Локальные диски мы использовали, но для наших объемов стоимость локального хранения выходила дороже, чем Netapp. В силу еще разных ценовых политик.

      А если эти объемы считать в облаках, то там и ценники уходят в облака :)


  1. ihard
    00.00.0000 00:00

    В списке приложений для вставки данных из Kafka в Elasticsearch почему то отсутствует Rsyslog, почему решили его не тестировать? Это готовое решение и по скорости скорее всего было бы на 1-м месте.


    1. rozmysl Автор
      00.00.0000 00:00

      Насколько я помню его вспомнили при анализе и его производительность была на уровне Kafkabeat. Основная проблема в обработке данных была в том, что надо было вытащить имя индекса, а для этого нужен был JSON парсинг, который съедал много ресурсов при любом вариант реализации. Возьмем к примеру их публикацию ( https://www.rsyslog.com/performance-tuning-elasticsearch/ ) - там они радуются 30K/s. Для 3M/s мне бы потребовалось 100 инстансев с их конфигурацией. Это достаточно ресурсоемко и затратно. Я обходился гораздо меньшим количеством своих байкалбитов


  1. dph
    00.00.0000 00:00
    +1

    А почему не рассматривались какие-то другие инструменты для хранения, кроме ElasticSearch? Тот же CH или хотя бы Influx или иные инструменты?
    Кажется, что использование полнотекстового движка для мониторинга по метрикам - не самое лучшее решение.


    1. rozmysl Автор
      00.00.0000 00:00

      А где я говорил что в ES хранятся метрики? Для метрик у там использовались специализированные хранилища на базе graphite / prometheus. Graphite перевели на backend на базе Clickhouse, а тугой prometheus на VictoriaMetrics.

      ES нужен именно для полнотекстового поиска по разным исходным данным.


      1. dph
        00.00.0000 00:00

        Тогда не понятно, а что именно хранили в ES. По каким данным требовался полнотекстовый поиск? И какой именно полнотекстовый поиск (по подстроке или с учетом грамматики или что-то еще)?


  1. khharut
    00.00.0000 00:00

    Спасибо за статью. Читал с большим интересом. Не могу не заметить один недостаток в этой архитектуре. На практика это будет его поддержка будет очень трудоемкой, иногда невозможной. При очень больших данных и очень большой нагузке плюс огромном кластере из 20+ машин контролирование (eng. monitoring) всего хозайства становиться почти нерешаемой задачей. Придеться нанимать новых людей чтобы следили за уведомлениями (eng. alerts), но и это не поможет. Если бы это решение работала на практике на больших данных, то его не приподносили бы как учебный в этой статье https://www.cloudskillsboost.google/focuses/12964?parent=catalog. Заранее отвечу, решение проблемы уведомлений (eng. alerts) это либо использование решений с Hadoop от Cloudera либо переход в облоко.