Привет, Хабр!

Меня зовут Дмитрий, вот уже два с половиной года я работаю DevOps инженером в крупной фин.тех компании. Специализируюсь, в основном, на брокерах сообщений. Большая часть сервисов у нас написана на java, нам нужна высокая отказоустойчивость, максимальная гарантия доставки и, поэтому, основной брокер — kafka. Собственно, о ней и хочу сегодня поговорить.

У нас развернуто в общей сложности 18 кластеров. В основном они небольшие, по 3 ноды, и используют их пара - тройка сервисов, но есть один большой, который используют практически все сервисы от мала до велика. На этом кластере тысячи топиков, почти 30 тысяч партиций в общей сложности, десятки миллионов(в пиках бывает до 100 млн.) сообщений в час и, если этот кластер развалится, у нас будут проблемы.

Хьюстон, kafka упала)
Хьюстон, kafka упала)

При этом кластер надо обновлять, иногда перезапускать отдельные ноды, а изредка случается так, что топики оказываются распределены неравномерно по кластеру и все это нужно фиксить незаметно для потребителей.

Но, сперва нужно немного познакомиться с работой kafka. 

Данные в kafka хранятся в т. н. топиках. Топик служит для разделения потоков данных.  Для распределения нагрузки каждый топик можно поделить на партиции (partitions) — с их помощью из одного топика можно читать в несколько потоков, при этом kafka гарантирует что каждый поток(консюмер) будет читать свою партицию. Каждая партиция в свою очередь может иметь реплики. При создании топика можно указать replication-factor (у нас он обычно равен 3-ём) и в таком случае одна реплика будет лидером, остальные слэйвами. Все продюсеры (producer — приложение, которое пишет в топик) и консюмеры (consumer — приложение, которое читает из топика) работают именно с лидером партиции. Если случается так, что на одной из нод оказывается слишком много лидеров, она оказывается перегруженной, что в свою очередь может привести к ее падению, а остальные, наоборот простаивают. Это первая проблема — нужен инструмент, для выполнения перебалансировки всего кластера.

Далее, у продюсера, как правило, имеется параметр acks, который может иметь значения 0, 1, all. Этот параметр отвечает за то, чтобы продюсер дожидался ответа от кластера о принятии его сообщения. 0 — не ждать ответа, 1 — ждать ответа от лидера партиции, all – ждать ответа от лидера и определенного количества реплик(количество определяется параметром min_isr - минимально допустимое количество синхронизированных репли). Для вывода ноды из кластера на работы, разумеется, можно ее просто выключить и все клиенты переключатся на одну из реплик (опустим этап с выборами для простоты повествования), но мы не знаем какой разработчик какие именно параметры использует в своем приложении. А если вспомнить, что разных сервисов у нас сотни, становится понятно, что уследить за этим невозможно. Но о чем это я… Если какой-то разработчик, который очень беспокоится о консистентности данных, поставит acks=all, то когда мы выключим одну ноду, и так совпадает, что на ней был топик, с которым работал этот разработчик, его приложение перестанет функционировать. Выход есть: перенести все топики с той ноды, которую мы хотим вывести из кластера. Это вторая проблема.

Так как мы работаем в финтехе, у нас имеются высокие требования к безопасности. В том числе, в кластер кафки изначально никто не имеет доступ. Если вы хотите работать с кластером, вам необходимо заказать сетевой доступ до всех нод кластера. Если мы добавляем ноду, мы настраиваем и вводим ее заранее, но потом нужно ждать какое-то время, пока все, кто работает с этим кластером, дозакажут себе сетевые доступы, следовательно, на время, пока нода в строю, но еще не у всех есть доступ, необходимо, чтобы топиков на этой ноде не было. Вы можете заметить, что при создании топика ему можно указать желаемые ноды для размещения, но об этом надо помнить, а, т. к. администраторов кластера несколько и все мы люди, это не всегда работает. Это третья проблема, нужен инструмент при  помощи которого можно переместить точечно один или несколько топиков.

Те, кто работал с kafka, вероятно знают, что есть несколько Web GUI (вот и вот например), в которых есть почти весь необходимый функционал, но это не всегда удобно, к тому же, его нужно где-то разворачивать, выделять ресурсы, обеспечивать безопасный доступ, авторизацию и т. п.

            Напротив, CLI инструменту это не нужно, можно просто запустить его у себя на ПК или на одной из нод кластера и воспользоваться логином/паролем администратора кластера. У kafka из коробки есть инструмент, который позволяет перемещать партиции, но он максимально неудобный. Чтобы переместить партиции с его помощью, необходимо подготовить особым образом отформатированный json – файл, запустить утилиту, она сгенерирует другой файл, потом запустить ее еще раз с другими параметрами. В свою очередь топики, которые хочется переместить, нужно поискать самому, т. е. это все крайне неудобно и точно не быстро. Если кому-то интересно, тут есть подробный гайд, как это делается встроенными средствами kafka. Статья уже довольно старая, но в целом актуальная.

Поискав денек в интернете, почитав статьи (их к слову не так уж много) я понял, что консольного интерфейса для решения всех вышеописанных проблем,  отвечающего описанным требованиям просто нет. Решил его создать.

Первая версия была написана на python и была максимально примитивная. Она делала запрос к kafka, собирала список топиков, сортировала их по заданным критериям, генерировала json и просто вызывала через subprocess консольные утилиты самой kafka. Довольно быстро стало понятно, что у такого подхода есть ряд проблем. Во-первых: алгоритм сортировки получился не очень удачным и партиции распределялись недостаточно равномерно; во-вторых, утилита работала довольно медленно и не стабильно; в-третьих, для ее работы запускать ее необходимо было только на одной из нод кластера, т. к. ей были нужны утилиты kafka и установленная JRE(java runtime environment, ява машина, если по-простому).

Костыль, такой же как мой прототип
Костыль, такой же как мой прототип

Так я пришел к тому, что нужно решение, желательно в виде бинарника, которое не имело бы никаких внешних зависимостей. Т.к. уже довольно давно я хотел начать писать на Golang, решено было взять его. Дальше предстояло определиться с библиотекой, которая поможет в решении задач. Сперва моему взору попалась библиотека от confluent, но для ее компиляции нужен gcc (внутри много кода на C(си)) и, к тому же, нацелена она в основном на создание consumer/producer. Далее рассмотрел kafka-go, но у нее не нашлось нужного функционала(ну или я не смог разобраться). И, наконец, мне на глаза попалось творение IBM – sarama. У нее есть весь необходимый функционал и в части admin client, и в части создания consumer/produccer приложений. Как позже выяснилось, не одному мне в нашей команде она приглянулась.

Теперь, наконец, немного о самом решении.

В приложение был заложен функционал подключения к кластеру с авторизацией (через scram-sha-256) и без, возможность перенести один или несколько топиков с одной ноды на другие, перенести все топики с одной ноды на другие и выполнить перераспределение всех топиков в кластере. Погрешность распределения на 1000 топиков всех партиций составила порядка 50-70. Лидеры же распределяются максимально равномерно(погрешность 1-2 партиции).

Для авторизации доступен только вариант со scram-sha-256, поскольку нам был нужен только он, но, если необходимы другие варианты, их не составит проблем добавить.

Работа приложения, от части, вдохнавлена terraform. При запуске, в зависимости,от указанных аргументов, строится план, выводится в виде таблицы текущее распределение и то, которое будет применено

./krpg --bootstrap-server 192.168.1.178:9092 \
--api-version 2.7.0 --action move --from 4 --to 1,2,3

Само применение плана может проходить в многопоточном режиме, количество потоков задается при запуске через ключ —treads [int]. Долго не мог придумать, как синхронизировать между собой потоки, оказалось, читать из закрытых каналов можно, это решило все проблемы. Создаем буферизированный канал и закрываем его, как только записали в него весь план. Горутины будут работать до тех пор, пока есть что читать из каналов и после завершатся. Чтобы дождаться завершения вспомогательных горутин, используем WaitGroup. wg.Add(Treads) задает количество горутин, которое нужно дождаться. wg.Done() сигнализирует о том, что горутина завершилась и больше ее ждать ненужно. wg.Wait() говорит о том, что в этом месте нужно остановиться и дождаться всех дочерних горутин. Подробнее про пакет sync можно почитать тут

После сборки получается bin файл размером 13 мегабайт. Утилита проста в обращении, к моему удивлению довольно быстро работает. Есть и недостатки. Оказалось, мало дать команду на перенос партиции, нужно реализовывать(это в планах) механизм ожидания самому чтобы дождаться переноса партиции. После первого запуска на реальном большом кластере я был весьма удивлен, когда, после завершения работы утилиты, я не смог прочитать из топика, т.к. он еще не был синхронизирован

Весь текст программы, а также релизы можно найти в моем репозитории. Приложение опубликованно под лицензией Apache-2.0. В завершении же хочу отметить, что я не разработчик, так что в программе могут быть алгоритмические или структурные ошибки и я буду рад, если вы о них сообщите в формате конструктивной критики.

P.S. Пока статья лежала “в столе”, в приложение были успешно добавлены scram-sha-512, а так же mTLS и SASL_SSL протоколы.

P.P.S В один из разов при выводе боевой ноды из сильно нагруженного кластера все просто зависло. Партиции повисли в процессе перемещения, а после ребута службы она не стартовала... Сперва были попытки руками в zookeeper выправить распределение партиций, но их были тысячи и в итоге было принято решение полностью снести и переразвернуть кластер, после чего я таки сделал запланированную доработку в виду синзронного режима работы(он теперь идет по умолчанию), т.е. следующий топик не перемещается пока все партиции текущего не выйдут в работу. Так же, пока статья ждала своего часа, был реализованн механизм бекапов, который сохраняет 10 последних версий распределения топиков. Бекап формируется автоматом, восстановить можно любую из версий

Комментарии (4)


  1. IvanovPetrovSidorov
    04.09.2025 15:44

    Про acks=all и выключение с последующей потерей работоспособности написано неправильно.


    1. leks15_05 Автор
      04.09.2025 15:44

      Согласен, не так сформулировал. Подразумевалось что если после отключения ноды ещё одна, по какой-то причине, выйдет из строя, или если min_isr=replication-factor(странно, но возможно) запись не будет работать и топик станет ридонли


  1. YuriyAkimov
    04.09.2025 15:44

    Почему не использовали cruise control?


    1. leks15_05 Автор
      04.09.2025 15:44

      Две причины.

      1. По той же причине что и остальные UI. Нужны ресурсы, согласования...

      2. Хотел попробовать силы в разработке.

      Это не корпоративный проект. Я пилил утилиту вечерами дома. Мне кажется лучший способ изучить язык - написать на нем что-то. А тут и проблема подвернулась)