В предыдущей статье мы достаточно подробно рассмотрели архитектуру Apache Kafka, из каких компонентов состоит данное решение, что для чего предназначено. И в этой статье мы рассмотрим процесс развертывания данного решения.

Для запуска Kafka можно использовать решения для управления кластером, такие как ZooKeeper или KRaft. В статье мы рассмотрим работу с ZooKeeper. Zookeper это распределенное приложение для управления кластером, состоящим из большого количества узлов.

Перед установкой Zookeeper не забудьте поставить Java, например вот так:

apt -y install openjdk-8-jre

Для установки zookeeper сначала перейдем на официальную страницу данного решения https://zookeeper.apache.org/releases.html и выберем последнюю версию.

Загрузим данную версию с помощью wget:

wget https://dlcdn.apache.org/zookeeper/zookeeper-3.8.1/apache-zookeeper-3.8.1-bin.tar.gz

Далее создадим каталог и распакуем в него архив:

mkdir /opt/zookeeper

tar zxvf apache-zookeeper-3.8.1.tar.gz -C /opt/zookeeper --strip-components=1

Для работы нам также потребуются дополнительные каталоги:

mkdir -p /opt/zookeeper/data /var/log/zookeeper

Далее мы переходим к конфигурированию системы. Для этого нам потребуется файл /opt/zookeeper/conf/zoo.cfg.

Этот файл необходимо наполнить следующим содержимым:

tickTime = 2000

maxSessionTimeout = 50000

syncLimit = 5

initLimit = 300

autopurge.purgeInterval = 1

autopurge.snapRetainCount = 5

snapCount = 200000

clientPort = 2181

maxClientCnxns = 100

4lw.commands.whitelist=stat

dataDir = /opt/zookeeper/data

dataLogDir = /var/log/zookeeper

Мы не будем в этой статье подробно останавливаться на значении каждого из параметров, так как это несколько выходит за рамки статьи.

Далее мы настроим запуск приложения в качестве сервиса. Для этого создадим специальную учетку, настроим ей права и создадим файл юнита в systemd.

useradd -r -c 'Zookeeper service' zookeeper

chown -R zookeeper:zookeeper /opt/zookeeper /var/log/zookeeper

Файл юнита /etc/systemd/system/zookeeper.service будет иметь следующую структуру:

[Unit]

Description=ZooKeeper Service

Documentation=https://zookeeper.apache.org/

Requires=network.target

After=network.target

 

[Service]

Type=forking

User=zookeeper

Group=zookeeper

WorkingDirectory=/opt/zookeeper

ExecStart=/opt/zookeeper/bin/zkServer.sh start

ExecStop=/opt/zookeeper/bin/zkServer.sh stop

ExecReload=/opt/zookeeperbin/zkServer.sh restart

TimeoutSec=30

Restart=on-failure

 

[Install]

WantedBy=default.target

Затем нам необходимо просто перезапустить system и запустить zookeper:

systemctl daemon-reload

systemctl enable zookeeper --now

Проверим статус zookepeer:

systemctl status zookeeper

Базовые настройки Zookeeper мы выполнили. Настройка управления отдельными узлами кластера опять таки не входит в тему данной статьи, поэтому далее мы перейдем к установке Kafka.

Ставим Кафку

Для начала нам потребуется загрузить последнюю версию продукта. Далее распаковываем архив и переходим в созданный каталог.

$ tar -xzf kafka_2.13-3.4.0.tgz

$ cd kafka_2.13-3.4.0

Далее нам необходимо запустить Zookeper с настройками, необходимыми для работы Kafka.

$ bin/zookeeper-server-start.sh config/zookeeper.properties

При успешном запуске мы получим примерно такую "портянку".

Далее откроем еще одну SSH сессию в которой собственно запустим Kafka

# Start the Kafka broker service

$ bin/kafka-server-start.sh config/server.properties

Здесь нас никакой особой псевдографикой не порадовали, так что просто смотрим вывод на предмет ошибок.

Как видно, мы создали базовую конфигурацию для работы Kafka. Для того, чтобы не плодить окна командной строки или SSH сессии я бы посоветовал перезапустить две последние команды в фоновом режиме, т.е. добавить &.

Создаем топик

Напомню из материалов предыдущей статьи, что Kafka - это платформа распределенной потоковой передачи событий, которая позволяет вам читать, записывать, хранить и обрабатывать сообщения на многих узлах.

Для начала в качестве примера создадим топик quickstart-events с помощью команды:

$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092

Давайте посмотрим статистику по созданному топику:

$ bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092

Как видно, пока наш топик пуст и самое время начать записывать в него сообщения. Для записи сообщений в топик quickstart-events воспользуемся следующей командой:

$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092

После запуска в интерактивном режиме вводим строки (Ctrl+C для завершения):

Как видите, я не переключил кодировку в первой строке и отправил строку на кириллице. Посмотрим как ее сохранит Kafka.

Для чтения событий из топика quickstart-events применим команду:

$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092

Как видите кириллица нормально сохранилась и прочиталась, это говорит о том, что у нас не должно быть проблем при использовании различных кодировок в сообщениях. Сообщения будут храниться в Kafka столько времени, сколько потребуется. При повторных подключениях мы можем прочитать текущий поток сообщений, а также добавлять новые.

Подключаем источники

Естественно, представленный вариант работы с сообщениями “вручную” на практике мало применим. Гораздо интереснее попробовать в автоматическом режиме прочитать сообщения из источника и поместить их в топик Kafka. В качестве примера мы импортируем данные из файла в раздел Kafka и затем экспортируем данные из раздела Kafka в файл.

Для этого нам потребуется указать в свойствах plugin.path файла config/connect-standalone.properties файл connect-file-3.4.0.jar. Откроем это файл и добавим в него строку:

plugin.path=libs/connect-file-3.4.0.jar

Собственно все, далее создаем текстовый файл с сообщениями, которые мы будем читать:

echo -e "event1\nevent2\nevent3" > test.txt

Далее нам необходимо запустить коннектор, который будет читать сообщения из этого файла и коннектор, который будет сохранять сообщения в sink файле. В наборе файлов, идущих вместе с Kafka есть файлы config/connect-file-source.properties и config/connect-file-sink.properties. В первом файле указаны настройки для считывания файла test txt, а втором настройки для файла хранения сообщений test.sink.txt. Когда команда ниже будет выполнена коннектор, считывающий сообщения из исходного файла test.txt будет загружать их в топик connect-test (который также есть в конфигурационных файлах), а sink коннектор начнет читать сообщения из топика connect-test и запишет их в файл test.sink.txt.

bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties

Убедиться в работе коннектора можно с помощью просмотра содержимого test.sink.txt:

Заключение

На этом, я полагаю сегодняшнюю статью можно завершить. Мы развернули Apache Kafka и Zookeeper, посмотрели как можно вручную создавать топики, добавлять и читать сообщения, а также развернули коннектор, читающий данные из текстового файла. В следующей статье мы продолжим изучение практических аспектов работы с Kafka. А сейчас хочу пригласить вас на бесплатный урок, в рамках которого рассмотрим как в приложениях на Spring Boot можно работать с Kafka. Узнаем, что предоставляет платформа Spring для ускоренной разработки приложений, работающих с Kafka. Посмотрим, какие есть настройки, как это все конфигурируется. Проведем границу между "родным функционалом" Kafka api и "добавками" от Spring Boot.

Комментарии (0)