Kafka позволит вам грамотно организовать работу с большим массивом данных, но в процессе может подкинуть проблем. Иногда придется устроить танцы с бубном, чтобы программа продолжила работать, а не рухнула в момент запуска.

О не очень стандартном использовании Apache Kafka и лайфхаках по созданию Data Lake на его основе нам рассказал Михаил Кобик, директор департамента инфраструктурных решений в SMART Technologies SOFT. В 2017 году перед командой Михаила встала непростая задачка - создать хранилище данных на 80 Tb. В распоряжении был спек, примерные нагрузки и абсолютное непонимание, что с этим делать со стороны заказчика. Передаем слово Михаилу. 

Что мы делали 

Решать вопрос мы начали с написания тестов и проверки прототипа архитектуры, постепенно дополняя его различными элементами свободного ПО. Вот примерно то, что мы знали на этапе бизнес-анализа.

Чтобы все заработало, нам требовалось связать огромное количество внешних устройств, которые исчислялись десятками тысяч. Через собственный бинарный протокол они передавали бы данные в нашу инфраструктуру, а мы должны были эти данные получить, расшифровать с помощью криптосервиса, разобрать и валидировать. Основная задача – понять, может ли данное устройство передавать информацию и сохранять ее у себя. После сохранения мы должны были отправить отчёт о получении этих данных.  Сама по себе схема – это полностью синхронные протоколы взаимодействия. Единственное дополнение здесь – криптосервис, который занимался шифрованием и являлся для нас черным ящиком.

На старте проекта мы понимали, что придётся обрабатывать нагрузку примерно в 15 тысяч RPS (количество запросов, получаемых сервером за секунду). Проблема в том, что это синхронный бинарный протокол. Получается, что каждый реквест от клиента делился бы на 6 операций вызова различных смежных сервисов. Умножаем 15 тысяч на 6 и получаем сумму запросов, которую нам предстояло обрабатывать. 

Упрощало процесс, то, что нагрузка хорошо прогнозировалась. Мы прекрасно знали, в какие дни месяца она будет сумасшедшая, а в какие дни – нет. Также мы представляли, как нагрузка распределена по времени суток: глубокой ночью было около 100 RPS, а днём, в 13:00-14:00, получалось заявленное в задании RPS.

Зная все вводные, мы начали искать шину данных, какой-то сервис, внутрь которого очень быстро можно было бы загружать информацию, получать подтверждение о ее сохранении и сообщать об этом нашему клиенту. 

Первые шаги с Apache Kafka

Мы протестировали практически все доступные варианты и даже пробовали написать свое ПО, но все разбивалось о какие-либо ограничения. Мы пробовали сделать это с помощью реляционных баз данных, но они себя тоже не оправдали. Еще использовали Hadoop, но он медленный, как и RabbitMQ. Так мы пришли к работе с Apache Kafka, у которой имелись свои преимущества:

  • Скорость

  • Инструмент легко масштабируется 

  • Высокая отказоустойчивость

  • Проста в эксплуатации

  • С Kafka мы уже работали, поэтому знали, какие подводные камни ожидать

  • Удобство доступа к данным

Поначалу всем всё нравилось: данные поступали и адекватно вычитывались. Во время проведения экспериментов мы настроили Retention (настройки, которые говорят, сколько нужно хранить сообщения в топике) на 60 дней: в течение этого времени Kafka должна была хранить данные. Изначально мы сделали это просто для подстраховки. Система новая, если кто-то где-то ошибется при создании микросервисов, хорошо иметь возможность пересчитать.

Правда вскоре случился небольшой казус – ужесточились требования от государственного регулятора. Нас обязали хранить данные 5 лет. Причем не только хранить, но и обеспечивать быструю выгрузку.


Непростая задачка: как это сделать? Мы можем гарантировать, что 60 дней данные у нас останутся, но регулятор запрашивает данные по определенным временным рамкам. У Kafka же не было удобного инструмента доступа к данным с такого рода фильтрами. Получилось так, что все вроде хорошо работало, но на инструмент наложились дополнительные ограничения. В этих условиях нам нужно было как-то всё бекапить.

Организация бекапа

Изначально мы делали снимки файловой системы во время работы, но потом оказалось, что восстанавливаться, когда у тебя кластерная Kafka, довольно трудно. Бекапы стали первой проблемой, с которой мы начали разбираться. Все свелось к очень простому требованию – нужно бекапить информацию так, что, если в наш центр обработки данных (ЦОД) прилетит метеорит, мы сможем восстановить работоспособность системы в короткие сроки. 

Мы потратили некоторое количество времени на изыскание вариантов и единственным интересным для нас оказался Kafka Mirror Maker и просто бекап в соседний ЦОД. Там мы арендовали стойку и организовали канал передачи данных. Таким образом стали отливаться все данные, которые у нас имелись. 

Правда Kafka Mirror Maker оказалась не очень стабильной штукой. Она могла залипнуть или вообще упасть. Поначалу мы связывали это с тем, что выдали мало памяти или другие настройки не докрутили. В конечном итоге оказалось, что она может зависнуть при потере нескольких TCP пакетов.

В 2017 году вариантов решения подобной проблемы особо не было. Один из разработчиков предложил сделать простой, но действенный тест. Мы считывали данные с одной партиции (их количество нам известно). Когда консьюмер считал всю партицию, он умножал полученное число на количество партиций, и мы получали приблизительное количество данных в топике. Если цифра росла, то всё хорошо, а если нет, необходимо было разбираться и искать ошибку.

Это решение оказалось очень эффективным. Оно позволило нам несколько раз решить проблемы с бекапами. Мы проводили учения по аварийному восстановлению и ни одного байта информации не потеряли. На основе этих метрик можно принимать решение о статусе репликации, и о том, что в контуре всё хорошо. Данные принимаются – значит мы работоспособны.

Эксперименты и оптимизация

Когда мы тестировали этот инструмент, у нас накопились лайфхаки, которыми хотелось бы поделиться.  Они помогут, если вам понадобится организовать хранилище данных на большой объем информации. Что-то советую делать, а от некоторых решений рекомендую отказаться — они привели к новым сложностям, а не к решению проблем.

1. Batch.Size

Первое, что вам нужно понять – есть параметр Batch.Size. Он отвечает за размер пачки вставляемых данных. Для нас оптимальным значением было 100 000 байт, для вашего проекта цифры могут отличаться. Как подобрать под себя? Идем маленькими шагами, берем вашего продюсера, пишем тест, тюним этот параметр и смотрим, насколько хорошо продюсер вставляет данные.

2. Consumer group

Вторая по частоте возникновения проблем – жизненный цикл в Consumer group. Есть определённое количество запущенных консумеров, они определяют, с каких партиций они читают данные. Как только один из консумеров отлетает, внутри группы необходимо сделать ребаланс. В момент ребаланса между консумерами перераспределяются партиции, соответственно, данные не читаются.

3.    Acks

Следующий параметр Acks=1. Когда у вас кластер разрастается, увеличивается время ответа вашему продюсеру и уменьшается скорость вставки данных в Kafka. Мы использовали кластер с параметром Replika. Set 3. Для нас было достаточным дождаться, когда мастер нам скажет, что данные записаны, а все остальные члены кластера разберутся как-нибудь сами. Забегая вперёд, скажу, что это была очень проигрышная ситуация в эксперименте, когда у нас входящий поток информации превысил 80 000 запросов в секунду. Мы столкнулись с тем, что Kafka могла потерять данные. Acks=1 в таком случае играла уже не на нашей стороне.

4. Использование компрессии

В интернете есть много рекомендаций по использованию компрессии при вставке в Kafka. Иногда эта история имеет драматические последствия для скорости вставки в Kafka, так как много времени уходит на внутренние механизмы инструмента. Если у вас нет проблем с хранением данных и не заканчиваются диски, то от компрессии можно отказаться. Оптимизировать этот параметр не стоит, наверное, вообще никогда.

5. Топики

Не используйте больше одной тысячи топиков, так как это вызывает серьезные тормоза при работе кластера. Это связано с тем, что Kafka нуждается в выделенном хранилище для собственной конфигурации и мета-информации. Таким хранилищем выступает ZooKeeper: его производительность является боттлнеком (ограничением системы) для Kafka. Так бывает, когда топиков становится больше тысячи. Выражается это, например, в аномально долгих операциях запуска членов кластера.

6. Инсталляция Kafka

Этот пункт был выведен эмпирическим путем, когда мы поняли, что инсталляция Kafka больше, чем на 12 Тб на единицу кластера драматически рушит производительность. Мы очень долго пытались разобраться, с чем это связано. Предположений было много, но мы неоднократно ловили наши кластера на том, что как только инсталляции разрастались, consumer group просто без объявления войны переставала вычитывать. В итоге мы выяснили, что такое поведение консьюмеров связано с тем, что в какой-то момент кластер не успевает отвечать на запросы по внутреннему протоколу, что приводит к залипанию этого самого консьюмера. Мы поняли, что выше 12 Тб уходить не стоит и использовали чудесную способность Kafka к ребалансу, когда приближались к такому объему. Отметим, что это не проблема скорости работы жестких дисков либо аппаратных средств.

7. Offset

Самый большой лайфхак, который позволит вам значительно быстрее ориентироваться внутри гигантских топиков – знание Offset. Это серийный номер сообщения внутри топика. С ним вы будете знать, где у вас внутри Kafka лежат данные, поэтому вам не придётся заново вычитывать всю информацию. Если у вас хранится метаинформация с офсетами, вы можете реализовывать поиск по временным рамкам.

Итог

Главный поинт данной инсталляции был в том, чтобы минимизировать какие-либо ее недоступности. Именно поэтому для бекапа мы использовали отдельный ЦОД с отдельным кластером. Конечно, мы сталкивались с аварийными ситуациями, 100 % uptime не было. Все те хитрости, которые рассказывались выше, набиты горьким опытом.

В конечном же итоге, когда мы уже пришли к нашему идеальному рецепту готовки Kafka, uptime близился к 100 %. Вам не нужно останавливать членов кластера, вам не нужно делать ничего с этой инсталляцией, она просто работает и это прекрасно. 

Обучение по Apache Kafka

Мы записали два формата курсов — для тех, кто только хочет разобраться с Kafka и для прошаренных разработчиков. Переходите по ссылке, чтобы узнать о курсах больше :
Apache Kafka База, cтарт потока в сентябре: https://slurm.club/3Qo2t52
Apache Kafka для разработчиков:
https://slurm.club/3BFdV8d

Комментарии (7)


  1. maxim_ge
    05.08.2022 19:13

    Этот пункт был выведен эмпирическим путем, когда мы поняли, что инсталляция Kafka больше, чем на 12 Тб на единицу кластера драматически рушит производительность.

    Можно пояснить, что такое "единица кластера"?


    1. hellamps
      05.08.2022 22:54

      видимо, нода одна.

      интересно что же будет, если в кластере пара нод приляжет и кафка переразмажет по существующим нодам, выехав за этот предел.


  1. maxim_ge
    05.08.2022 19:27
    +1

    Мы пробовали сделать это с помощью реляционных баз данных, но они себя тоже не оправдали. Еще использовали Hadoop, но он медленный, как и RabbitMQ.

    А какое "железо" используется, на котором "реляционные БД" и Rabbit не способны обеспечить 15 тысяч RPS?


  1. md_backend_binance
    05.08.2022 21:46

    Маловато чтото , у нас около 70к\с как у насдака. Но мы используем ksqldb очень во многих местах, вместо самописных экстернал приложений


    1. aleks_raiden
      06.08.2022 00:23

      Ммм, а можно поподробнее? Как раз проектирую подобную систему в той же области...


  1. kohus
    06.08.2022 12:51

    В Кафке есть индекс по timestamp, поэтому можно быстро спозиционировать консумента на самую первую запись с заданным временем. А для того, чтобы примерно посчитать количество данных, можно использовать разницу оффсетов(инструмент GetOffsetShell)


  1. akakoychenko
    06.08.2022 20:52

    Стремный подход, как по мне
    Если держать в кафке данные за 5 лет, то менеджить ее будет адом. Даже банальный ребаланс при увеличении количества партиций на таком объеме это жесть. А если одна нода на 10ТБ упадет физически, и надо восстановить, и при этом это все под постоянной записью, тоже, чувствую, задача не то, чтобы тривиальная.
    Не понимаю, почему не пойти по бест практисам, и не перекладывать данные раз в день с кафки в архивы по 1-10ГБ каждый на s3-совместимое хранилище/в HDFS/тупо в файловую систему на машинах по 20+ механических дисков в рейде?