Вступление

Привет, Хабр! Работаю java программистом в одной финансовой организации. Решил оставить свой след на хабре и написать первую свою статью. В силу проблем с наличием девопсеров передо мной была поставлена задача обновить кластер кафки с 2.0 до 2.6 без даунтайма и потери сообщений (сами понимаете, никто не любит, когда денежки зависают в воздухе или где-то теряются). Хочу поделиться этим опытом с Вами и получить конструктивный фидбек. Итак, хватит воды, переходим сразу к делу.

Схема миграции

Задача усложнялась тем, что надо было мигрировать со старых виртуалок на новые, поэтому вариант с тем, чтобы выключить брокера, поменять бинарники и запустить, мне не подходил.

Ниже представлена схема миграции. Имеем 3 брокеров на старых ВМ, 3 брокеров на новых ВМ, также для каждого брокера имеется свой zookeeper.

План миграции

Для того чтобы нам мигрировать так, чтобы этого никто не заметил, придется "немного" попотеть. Ниже приведен общий план.

  1. Добавить в настройки приложения адреса новых брокеров

  2. Пробить доступы между всеми и вся

  3. Подготовить инфраструктуру на новых виртуалках

  4. Поднять новый кластер зукиперов и объединить его со старым

  5. Поднять новых брокеров кафки

  6. Мигрировать все партиции со старых брокеров на новые

  7. Отключить старые брокеры кафки и старых зукиперов

  8. Убрать из новых конфигов старых брокеров и зукиперов

А теперь поехали по порядку разбирать подробнее каждый пункт

Добавление брокеров в приложение

Самый простой пункт. В коде клиента, там где были прописаны адреса старых брокеров, туда же дописываем новых. В свойство "bootstrap.servers"

Строка настройки

old-server1:9092,old-server2:9092,old-server3:9092,new-server4:9092,new-server5:9092,new-server6:9092

Пробитие доступов

Одно из самых нелюбимых занятий - это пробитие доступов. Заявки согласуются долго, а выполняются еще дольше. Доступов нам придется заказать немало, давайте разберемся какие именно.

  1. Как вы уже догадались по строке настройки брокеров, нам нужен порт 9092 в направлении Приложение -> новые брокеры кафки (в сумме 3 доступа)

  2. Этот же порт для Новые брокеры <----> Старые брокеры (+18 доступов)

  3. Все тот же порт 9092 между каждым новым брокером (+6 доступов)

  4. Аналогичные доступы из пунктов 2 и 3 для портов зукиперов: 2181, 2888, 3888 ( (18+6)*3 = 72)

Итого: 99 доступов. Получите и распишитесь! А потом еще проверьте все.

обидно что до 100 не дотянули, было бы посолиднее

Когда мы прошли через эти страдания и настроили доступы, приступаем к настройке инфраструктуры.

Инфраструктура нового кластера

Для начала нам нужно создать пользователя kafka на новых виртуалках

Далее очень важный пункт - необходимо прописать разрешенное количество открытых файлов пользователю kafka. А именно, в файле /etc/security/limits.conf дописываем строки

kafka hard nofile 262144
kafka soft nofile 262144

Если мы этого не сделаем, то это грозит нам внезапным падением брокера, так как кафка по своей природе любит открывать много файлов, поэтому надо подготовиться к ее аппетитам.

Честно говоря, число 262144, взято просто потому, что на старом кластере было установлено такое значение (исторически сложилось). Хотя у нас кафка открывает не больше 10 тысяч файлов, поэтому вы можете провести анализ и выставить более подходящее.

Далее нам надо прописать переменную среды к java

в файл /home/kafka/.bash_profile дописываем

export JAVA_HOME=/opt/java
export PATH=JAVA_HOME/bin:PATH

Соответственно у вас должна лежать jre в папке /opt/java

Далее раскидываем папки по нужным директориям и устанавливаем права

Ниже описан скрипт, реализующий, то что я описал выше. Его надо выполнить на каждом новом брокере.

setup.sh
tar -xf ../jdk1.8.0_181.tar.gz -C /opt/
mv /home/kafka/kafka /opt
mv /home/kafka/zookeeper /opt
mv /home/kafka/kafka-start.sh /opt
mv /home/kafka/scripts /opt

ln -sfn /opt/kafka/kafka_2.13-2.6.0 /opt/kafka/current
ln -sfn /opt/zookeeper/zookeeper-3.6.2 /opt/zookeeper/current
ln -sfn /opt/jdk1.8.0_181/ /opt/java

chown -R kafka:kafka /opt
chmod -R 700 /opt

#env_var start ------------------------------->

kafkaProfile=/home/kafka/.bash_profile

homeVar="export JAVA_HOME=/opt/java"
javaHome=$(cat $kafkaProfile | grep "$homeVar")


if [ "$javaHome" != "$homeVar" ]; then
    echo -e "\n$homeVar\n" >> $kafkaProfile
fi


pathVar="export PATH=\$JAVA_HOME/bin:\$PATH"
path=$(cat $kafkaProfile | grep "$pathVar")

if [ "$path" != "$pathVar" ]; then
    echo -e "\n$pathVar\n" >> $kafkaProfile
fi

#env_var end --------------------------------->




#ulimit start >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>

limitsFile=/etc/security/limits.conf

soft="kafka soft nofile 262144"
limitSoft=$(cat $limitsFile | grep "$soft")

if [ "$limitSoft" != "$soft" ]; then
    echo -e "\n$soft\n" >> $limitsFile
fi


hard="kafka hard nofile 262144"
limitHard=$(cat $limitsFile | grep "$hard")

if [ "$limitHard" != "$hard" ]; then
    echo -e "\n$hard\n" >> $limitsFile
fi

#ulimit end >>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>

Далее из под пользователя kafka проверяем что у нас все установлено правильно

ulimit -n  = 262144
echo $JAVA_HOME  = /opt/java
echo $PATH  должен содержать /opt/java/bin

Новый кластер зукиперов

Необходимо проверить, что в конфиге новых зукиперов прописаны все участники ансабля, а также указана директория, для работы зукипера, где хранится файл myid, в котором прописаны id зукиперов.

ниже кусок файла конфигурации зукипера /opt/zookeeper/current/conf/zoo.cfg

zoo.cfg
dataDir=/opt/zookeeper/zookeeper-data
server.1=server1:2888:3888
server.2=server2:2888:3888
server.3=server3:2888:3888

server.4=server4:2888:3888
server.5=server5:2888:3888
server.6=server6:2888:3888

Стартуем все зукиперы /opt/zookeeper/current/bin/zkServer.sh start

Далее нам надо добавить в конфиг старых зукиперов новых участников. Для этого на старых серверах выполняем такой скрипт

zk-add-new-servers.sh
echo -e "\nserver.4=server4:2888:3888" >> /opt/zookeeper/zookeeper-3.4.13/conf/zoo.cfg
echo -e "\nserver.5=server5:2888:3888" >> /opt/zookeeper/zookeeper-3.4.13/conf/zoo.cfg
echo -e "\nserver.6=server6:2888:3888" >> /opt/zookeeper/zookeeper-3.4.13/conf/zoo.cfg

Теперь нужно определить кто из зукиперов был лидером, и перезапустить все зукиперы. Лидера в последнюю очередь.

/opt/zookeeper/zookeeper-3.4.13/bin/zkServer.sh restart

Наконец проверяем, что все зукиперы увидели друг друга и выбрали лидера. Везде выполняем

.../zkServer.sh status

Настройка ансамбля зукиперов завершена.

Поднятие новых брокеров

Теперь можно заняться запуском новых брокеров, но перед этим нужно убедиться что в server.properties у нас проставлен broker.id и zookeeper.connect

Пример для 4 брокера

broker.id=4
zookeeper.connect=server4:2181,server5:2181,server6:2181/cluster_name

Здесь мы указали только новых зукиперов, но так как они находятся в ансамбле, то новые брокеры узнают через них о старом кластере.

Также необходимо там же в server.properties прописать текущую версию протокола взаимодействия между брокерами (2.0.0). Иначе мы не сможем переместить партиции.

inter.broker.protocol.version=2.0.0

Запускаем новых брокеров

nohup /opt/kafka/current/bin/kafka-server-start.sh /opt/kafka/config/server.properties > log.log 2>&1 &

Теперь необходимо проверить что новые брокеры присоединились к кластеру. Для этого заходим в cli любого из зукиперов.

/opt/zookeeper/current/bin/zkCli.sh

и выполняем там команду

ls /cluster_name/brokers/ids

Должен быть выведен такой ответ

[1,2,3,4,5,6]

Это значит что в кластере у нас все 6 штук брокеров, чего мы и добивались.

Перемещение партиций

Сейчас начинается самое интересное. Изначально где-то сидело в голове, а иногда и подкреплялось чатиками по кафке, что партиции сами переедут, нужно только поднять новых брокеров и поочереди гасить старые. replication.factor должен сработать и реплицировать все на новых брокеров. В действительности все оказалось не так просто. Текущие партиции мертво приклеены к старым брокерам и только создание новых топиков приводит к появлению партиций на новых брокерах.

Заглянув в документацию я увидел , что перемещение партиций "автоматическое", но надо проделать немалое количество манипуляций, чтобы запустить все это дело.

Сначала нужно сгенерировать json в котором надо указать все топики, которые будут перемещаться.

{"topics": [{"topic": "foo1"},
              {"topic": "foo2"}],
  "version":1
 }

Так как топиков у нас было много, я сгенерил несколько таких файлов

Далее все эти json файлы надо скормить утилите kafka-reassign-partitions.sh с аргументом --generate и указать id брокеров куда мы хотим переехать --broker-list "4,5,6".

У нас получится несколько файлов такого вида

generate1.json
{"version":1,
  "partitions":[{"topic":"foo1","partition":2,"replicas":[1,2,3]},
                {"topic":"foo1","partition":0,"replicas":[3,2,1]},
                {"topic":"foo2","partition":2,"replicas":[1,2,3]},
                {"topic":"foo2","partition":0,"replicas":[3,2,1]},
                {"topic":"foo1","partition":1,"replicas":[2,3,1]},
                {"topic":"foo2","partition":1,"replicas":[2,3,1]}]
  }

  Proposed partition reassignment configuration

  {"version":1,
  "partitions":[{"topic":"foo1","partition":2,"replicas":[5,4,6]},
                {"topic":"foo1","partition":0,"replicas":[4,5,6]},
                {"topic":"foo2","partition":2,"replicas":[6,4,5]},
                {"topic":"foo2","partition":0,"replicas":[4,5,6]},
                {"topic":"foo1","partition":1,"replicas":[5,4,6]},
                {"topic":"foo2","partition":1,"replicas":[4,5,6]}]
  }

Все что идет до Proposed partition reassignment configuration это текущее распределение партиций (может пригодиться для роллбека). Все что после - распределение после переезда. Соответственно нам надо собрать все эти нижние половины со всех файлов, сформировать новые json.

Так как все это делать руками ну ооочень долго. я "быстро" накатал скрипт, который вызывает все эти команды и режет файлы как нам надо. За резку файлов отвечает джарник kafka-reassign-helper.jar код можно глянуть тут.

Ниже скрипт подготовки файлов для переезда партиций

prepare-for-reassignment.sh
#параметр отвечает за количество топиков в одном файле. у нас было 20
if [ "$#" -eq 0 ]
then
    echo "no arguments"
    exit 1
fi


echo "Start reassignment preparing"

/opt/kafka/current/bin/kafka-topics.sh --list --zookeeper localhost:2181/cluster_name >> topics.txt

echo created topics.txt

java -jar kafka-reassign-helper.jar generate topics.txt $1

fileCount=$(ls -dq generate*.json | wc -l)

echo "created $fileCount file for topics to move"

echo -e "\nCreating generated files\n"

mkdir -p generated
for ((i = 1; i < $fileCount+1; i++ ))
do
/opt/kafka/current/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/cluster_name --broker-list "4,5,6" --topics-to-move-json-file "generate$i.json" --generate >> "generated/generated$i.txt"
echo "generated/generated$i.txt" created
done

echo -e "\nCreating execute/rollback files"

java -jar kafka-reassign-helper.jar execute $fileCount

echo -e "\nexecute/rollback files created"

echo -e "\nPreparing finished successfully!"

После выполнения скрипта скармливаем сгенеренные файлы по очереди утилите kafka-reassign-partitions.sh, но на этот раз с параметром --execute

move-partitions.sh
#параметр отвечает за номер файла execute1.json, execute2.json ....
if [ "$#" -eq 0 ]
then
    echo "no arguments"
    exit 1
fi
        

/opt/kafka/current/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/cluster_name --reassignment-json-file "execute/execute$1.json" --execute

Вот теперь кафка начнет перемещать партиции. В логе новых брокеров судорожно забегают строки, как и в логах старых. Пока все это происходит, чтобы не скучать, нам предоставили третий аргумент kafka-reassign-partitions.sh, а именно --verify, который предоставляет узнать текущий статус переезда.

Опять же в силу лени, чтобы постоянно не дергать руками эту команду, был написан очередной скрипт.

reassign-verify.sh
progress=-1

while [ $progress != 0 ]
do

    progress=$(/opt/kafka/current/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/cluster_name --reassignment-json-file execute/execute$1.json --verify | grep "in progress" -c)
    complete=$(/opt/kafka/current/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/cluster_name --reassignment-json-file execute/execute$1.json --verify | grep "is complete" -c)
    failed=$(/opt/kafka/current/bin/kafka-reassign-partitions.sh --zookeeper localhost:2181/cluster_name --reassignment-json-file execute/execute$1.json --verify | grep "failed" -c)

    echo "In progress:" $progress;
    echo "Is complete:" $complete;
    echo "Failed:" $failed;

    sleep 2s

done

Вызываем и ждем пока In progress станет равным 0. а Is complete примет какое-то конечное значение. И надеемся что Failed так и останется 0.

Когда все закончилось вызываем снова move-partitions.sh и reassign-verify.sh для следующего файла, до тех пор пока не переберем все наши файлы миграции.

После обработки всех файлов идем в log.dirs старых брокеров и убеждаемся, что там остались только технические логи самой кафки - значит все партиции точно на новом кластере.

Отключение старого кластера

Тут все просто. выполняем kafka-server-stop.sh и zkStop.sh для брокеров и зукиперов соответственно.

Чистим конфиги нового кластера

Начинаем с зукиперов. Из файлов zoo.cfg удаляем лишние записи

zk-remove-old-servers.sh
sed -i '/server.1/d' /opt/zookeeper/current/conf/zoo.cfg
sed -i '/server.2/d' /opt/zookeeper/current/conf/zoo.cfg
sed -i '/server.3/d' /opt/zookeeper/current/conf/zoo.cfg

Далее, как обычно, у зукиперов определяем кто лидер и перезапускаем по очереди, лидера в последнюю очередь. После перезапуска проверяем статусы: 2 фолловера и 1 лидер.

Удаляем из server.properties брокеров старый протокол общения

remove-old-protocol.sh
sed -i '/inter.broker.protocol.version=2.0.0/d' /opt/kafka/config/server.properties

Теперь вроде как можно перезапустить брокеров, но надо убедиться, что все партиции находятся в состоянии insync, причем не в количестве min.insync.replicas (у нас равно 2), а в количестве default.replication.factor (у нас 3)

Иначе если где-то будет 2 реплики, а не 3, то при перезапуске брокера, на котором лежит одна из этих двух реплик, мы получим ошибку на клиенте, что не хватает insync реплик.

Ниже скриптик для проверки

check-insync.sh
input="check-topics.txt"
rm -f $input

/opt/kafka/current/bin/kafka-topics.sh --list --zookeeper localhost:2181/cluster_name >> check-topics.txt

checkPerIter=100
i=0
list=""
notInsync=0
while IFS= read -r line
do
 ((i=i+1))
 list+="${line}|"
 if [ $i -eq $checkPerIter ]
  then
   list=${list::${#list}-1}
   echo "checking $list"
   count=$(/opt/kafka/current/bin/kafka-topics.sh --describe --topic $list --zookeeper localhost:2181/cluster_name | egrep "Isr: [4-6/,]{3}$" -c)
   if [ "$count" -ne 0 ]
    then
     /opt/kafka/current/bin/kafka-topics.sh --describe --topic $list --zookeeper localhost:2181/cluster_name | egrep "Isr: [4-6/,]{3}$"
   fi
   ((notInsync=notInsync+count))
   list=""
   i=0
 fi
done < "$input"

echo "not insync: $notInsync"

Если на выходе получаем not insync: 0, то можно по очереди перезапустить брокеров.

Вот и все. Миграция брокеров на этом завершена, если не считать последущей перенастройки мониторинга и прочих вспомогательных дел.

Вот как выглядит инструкция, которую я отправил админам, которые все это проворачивали на бою. На удивление справились с первого раза и без вопросов.

README.txt
Инструкция по миграции кафка 2.0.0 -> 2.6.0

1 этап. Подготовка инфраструктуры

1.1 Скопировать архивы для миграции на сервера кафки

NEW4.tar.gz -> адрес 4 брокера
migration.tar.gz -> адрес 4 брокера

NEW5.tar.gz -> адрес 5 брокера
NEW6.tar.gz -> адрес 6 брокера

1.2 Разархивировать архивы

tar -xf NEW4.tar.gz -C /home/kafka
tar -xf NEW5.tar.gz -C /home/kafka
tar -xf NEW6.tar.gz -C /home/kafka

1.3 От пользователя root выполнить скрипт на каждом новом сервере (установка файлов в нужные директории, прав и лимитов)

/home/kafka/scripts/setup.sh

1.4 Перейти на пользователя kafka (на каждом новом сервере)

1.5 Проверить следующие параметры среды (на каждом новом сервере)

ulimit -n  = 262144
echo $JAVA_HOME  = /opt/java
echo $PATH  должен содержать /opt/java/bin

содержимое папки /opt должно принадлежать kafka kafka



2 этап. Запуск зукиперов и брокеров

2.1 Перейти под пользователя kafka (на каждом новом сервере)

2.2 Выполнить скрипт на каждом новом сервере (запускаются новые зукиперы)

/opt/scripts/zkStart.sh

2.3 Скопировать архивы на старые сервера кафки 

OLD1.tar.gz -> адрес 1 брокера
OLD2.tar.gz -> адрес 2 брокера
OLD3.tar.gz -> адрес 3 брокера

2.4 Разархивировать архивы

tar -xf OLD1.tar.gz -C /home/kafka
tar -xf OLD2.tar.gz -C /home/kafka
tar -xf OLD3.tar.gz -C /home/kafka

2.5 От пользователя root выполнить скрипт на каждом старом сервере (установка прав на скрипты миграции)

/home/kafka/scripts/setup-old.sh

2.6 Перейти под пользователя kafka на каждом старом сервере

2.7 Выполнить скрипт на каждом старом сервере (добавляем новые зукиперы в конфиг старых зукиперов)

/home/kafka/scripts/zk-add-new-servers.sh

2.8 Выполнить скрипт на каждом старом сервере (проверка статуса зукиперов)

/home/kafka/scripts/zkStatus.sh

определить какой из старых серверов является лидером

2.9 Перезапустить по очереди зукиперы на старых серверах
лидера перезапускать в последнюю очередь

/home/kafka/scripts/zkRestart.sh

2.10 Проверить что кластер зукиперов подхватил новые сервера
для этого на всех серверах выполнить

/home/kafka/scripts/zkStatus.sh (на старых)
/opt/scripts/zkStatus.sh (на новых)

все сервера кроме одного должны быть в состоянии follower
один сервер leader

2.11 запуск новых брокеров
на новых серверах выполнить скрипт
/opt/kafka-start.sh

2.12 проверить что новые брокеры в кластере
выполнить на новом сервере

/home/kafka/migration/zkCli.sh

ls /cluster_name/brokers/ids

должно быть выведено [1,2,3,4,5,6]

3 этап Перемещение партиций на новых брокеров

3.1 Выполнить скрипт (подготовка файлов для перемещения партиций)
/home/kafka/migration/prepare-for-assignment.sh  20

3.2 Выполнить данный пункт по очереди для каждого файла в директории /home/kafka/migration/execute


Пример:
для файла execute1.json
/home/kafka/migration/move-partitions.sh 1

после начала выполнения скрипта выполнить скрипт 

Пример:
/home/kafka/migration/reassign-verify.sh 1

когда количество партиций в состоянии "in progress" достигнет 0 дождаться ОК от программиста и приступить к п.3.2 для следующего файла

3.3 После завершения перемещения партиций производится проверка логов на наличие ошибок, а также проверка директории /opt/kafka/kafka-data на всех старых брокерах
Должны остаться только технические логи кафки

3.4 По очереди выключить на старых серверах брокеров, выполнив скрипт
/opt/kafka/current/bin/kafka-server-stop.sh

3.5 По очереди выключить зукиперов на старых серверах, выполнив скрипт

/home/kafka/scripts/zkStop.sh

3.6 На новых серверах выполнить скрипт (удаляем старых зукиперов из конфига новых зукиперов)
/opt/scripts/zk-remove-old-servers.sh

3.7 На новых серверах перезапустить зукиперов

/opt/scripts/zkRestart.sh

3.8 На новых серверах проверить статус зукиперов (один лидер, 2 фоловера)

/opt/scripts/zkStatus.sh

3.9 На новых серверах выполнить скрипт (удаляем из конфига старый протокол общения между брокерами)
/opt/scripts/remove-old-protocol.sh


3.10 Проверить что все реплики в статусе insync

для этого запустить скрипт /home/kafka/migration/check-insync.sh

not insync должно быть равно 0

3.11 По очереди для каждого нового брокера выполнить

/opt/kafka/current/bin/kafka-server-stop.sh

дождаться пока брокер остановиться (ps aux | grep kafka должен показывать только зукипера)

выполнить /opt/kafka-start.sh

Приступить к 3.10 следующего брокера

Миграция завершена!

Надеюсь, что кому-то помог в этом вопросе. Жду ваших комментов и замечаний.

Все скрипты и инструкции, в том числе для роллбеков можно найти тут