![](https://habrastorage.org/getpro/habr/upload_files/985/717/2fd/9857172fdb8695d17851b7ce1c165b8e.png)
В предыдущей статье мы достаточно подробно рассмотрели архитектуру Apache Kafka, из каких компонентов состоит данное решение, что для чего предназначено. И в этой статье мы рассмотрим процесс развертывания данного решения.
Для запуска Kafka можно использовать решения для управления кластером, такие как ZooKeeper или KRaft. В статье мы рассмотрим работу с ZooKeeper. Zookeper это распределенное приложение для управления кластером, состоящим из большого количества узлов.
Перед установкой Zookeeper не забудьте поставить Java, например вот так:
apt -y install openjdk-8-jre
Для установки zookeeper сначала перейдем на официальную страницу данного решения https://zookeeper.apache.org/releases.html и выберем последнюю версию.
Загрузим данную версию с помощью wget:
wget https://dlcdn.apache.org/zookeeper/zookeeper-3.8.1/apache-zookeeper-3.8.1-bin.tar.gz
Далее создадим каталог и распакуем в него архив:
mkdir /opt/zookeeper
tar zxvf apache-zookeeper-3.8.1.tar.gz -C /opt/zookeeper --strip-components=1
Для работы нам также потребуются дополнительные каталоги:
mkdir -p /opt/zookeeper/data /var/log/zookeeper
Далее мы переходим к конфигурированию системы. Для этого нам потребуется файл /opt/zookeeper/conf/zoo.cfg.
Этот файл необходимо наполнить следующим содержимым:
tickTime = 2000
maxSessionTimeout = 50000
syncLimit = 5
initLimit = 300
autopurge.purgeInterval = 1
autopurge.snapRetainCount = 5
snapCount = 200000
clientPort = 2181
maxClientCnxns = 100
4lw.commands.whitelist=stat
dataDir = /opt/zookeeper/data
dataLogDir = /var/log/zookeeper
Мы не будем в этой статье подробно останавливаться на значении каждого из параметров, так как это несколько выходит за рамки статьи.
Далее мы настроим запуск приложения в качестве сервиса. Для этого создадим специальную учетку, настроим ей права и создадим файл юнита в systemd.
useradd -r -c 'Zookeeper service' zookeeper
chown -R zookeeper:zookeeper /opt/zookeeper /var/log/zookeeper
Файл юнита /etc/systemd/system/zookeeper.service будет иметь следующую структуру:
[Unit]
Description=ZooKeeper Service
Documentation=https://zookeeper.apache.org/
Requires=network.target
After=network.target
[Service]
Type=forking
User=zookeeper
Group=zookeeper
WorkingDirectory=/opt/zookeeper
ExecStart=/opt/zookeeper/bin/zkServer.sh start
ExecStop=/opt/zookeeper/bin/zkServer.sh stop
ExecReload=/opt/zookeeperbin/zkServer.sh restart
TimeoutSec=30
Restart=on-failure
[Install]
WantedBy=default.target
Затем нам необходимо просто перезапустить system и запустить zookeper:
systemctl daemon-reload
systemctl enable zookeeper --now
Проверим статус zookepeer:
systemctl status zookeeper
![](https://habrastorage.org/getpro/habr/upload_files/3e8/7e0/b83/3e87e0b830e95c45e72b308840ef344c.png)
Базовые настройки Zookeeper мы выполнили. Настройка управления отдельными узлами кластера опять таки не входит в тему данной статьи, поэтому далее мы перейдем к установке Kafka.
Ставим Кафку
Для начала нам потребуется загрузить последнюю версию продукта. Далее распаковываем архив и переходим в созданный каталог.
$ tar -xzf kafka_2.13-3.4.0.tgz
$ cd kafka_2.13-3.4.0
Далее нам необходимо запустить Zookeper с настройками, необходимыми для работы Kafka.
$ bin/zookeeper-server-start.sh config/zookeeper.properties
При успешном запуске мы получим примерно такую "портянку".
![](https://habrastorage.org/getpro/habr/upload_files/50c/6de/b53/50c6deb539759812d8531cb920e5dae8.png)
Далее откроем еще одну SSH сессию в которой собственно запустим Kafka
# Start the Kafka broker service
$ bin/kafka-server-start.sh config/server.properties
Здесь нас никакой особой псевдографикой не порадовали, так что просто смотрим вывод на предмет ошибок.
![](https://habrastorage.org/getpro/habr/upload_files/9e6/a6b/1ce/9e6a6b1ce4d31aa06b631e6c8d27db17.png)
Как видно, мы создали базовую конфигурацию для работы Kafka. Для того, чтобы не плодить окна командной строки или SSH сессии я бы посоветовал перезапустить две последние команды в фоновом режиме, т.е. добавить &.
Создаем топик
Напомню из материалов предыдущей статьи, что Kafka - это платформа распределенной потоковой передачи событий, которая позволяет вам читать, записывать, хранить и обрабатывать сообщения на многих узлах.
Для начала в качестве примера создадим топик quickstart-events с помощью команды:
$ bin/kafka-topics.sh --create --topic quickstart-events --bootstrap-server localhost:9092
![](https://habrastorage.org/getpro/habr/upload_files/916/23e/ac9/91623eac9772f01365726dbb660fc9ba.png)
Давайте посмотрим статистику по созданному топику:
$ bin/kafka-topics.sh --describe --topic quickstart-events --bootstrap-server localhost:9092
![](https://habrastorage.org/getpro/habr/upload_files/b73/8a8/a94/b738a8a94d0ef45eefe970e68daabca1.png)
Как видно, пока наш топик пуст и самое время начать записывать в него сообщения. Для записи сообщений в топик quickstart-events воспользуемся следующей командой:
$ bin/kafka-console-producer.sh --topic quickstart-events --bootstrap-server localhost:9092
После запуска в интерактивном режиме вводим строки (Ctrl+C для завершения):
![](https://habrastorage.org/getpro/habr/upload_files/a72/25b/cbf/a7225bcbf5cdc96f21e3e9dce9818aac.png)
Как видите, я не переключил кодировку в первой строке и отправил строку на кириллице. Посмотрим как ее сохранит Kafka.
Для чтения событий из топика quickstart-events применим команду:
$ bin/kafka-console-consumer.sh --topic quickstart-events --from-beginning --bootstrap-server localhost:9092
![](https://habrastorage.org/getpro/habr/upload_files/55c/5a0/c06/55c5a0c0654eccd3588213b2492e2233.png)
Как видите кириллица нормально сохранилась и прочиталась, это говорит о том, что у нас не должно быть проблем при использовании различных кодировок в сообщениях. Сообщения будут храниться в Kafka столько времени, сколько потребуется. При повторных подключениях мы можем прочитать текущий поток сообщений, а также добавлять новые.
Подключаем источники
Естественно, представленный вариант работы с сообщениями “вручную” на практике мало применим. Гораздо интереснее попробовать в автоматическом режиме прочитать сообщения из источника и поместить их в топик Kafka. В качестве примера мы импортируем данные из файла в раздел Kafka и затем экспортируем данные из раздела Kafka в файл.
Для этого нам потребуется указать в свойствах plugin.path файла config/connect-standalone.properties файл connect-file-3.4.0.jar. Откроем это файл и добавим в него строку:
plugin.path=libs/connect-file-3.4.0.jar
Собственно все, далее создаем текстовый файл с сообщениями, которые мы будем читать:
echo -e "event1\nevent2\nevent3" > test.txt
Далее нам необходимо запустить коннектор, который будет читать сообщения из этого файла и коннектор, который будет сохранять сообщения в sink файле. В наборе файлов, идущих вместе с Kafka есть файлы config/connect-file-source.properties и config/connect-file-sink.properties. В первом файле указаны настройки для считывания файла test txt, а втором настройки для файла хранения сообщений test.sink.txt. Когда команда ниже будет выполнена коннектор, считывающий сообщения из исходного файла test.txt будет загружать их в топик connect-test (который также есть в конфигурационных файлах), а sink коннектор начнет читать сообщения из топика connect-test и запишет их в файл test.sink.txt.
bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
Убедиться в работе коннектора можно с помощью просмотра содержимого test.sink.txt:
![](https://habrastorage.org/getpro/habr/upload_files/744/13b/b94/74413bb94357a158e1514506bd557083.png)
Заключение
На этом, я полагаю сегодняшнюю статью можно завершить. Мы развернули Apache Kafka и Zookeeper, посмотрели как можно вручную создавать топики, добавлять и читать сообщения, а также развернули коннектор, читающий данные из текстового файла. В следующей статье мы продолжим изучение практических аспектов работы с Kafka. А сейчас хочу пригласить вас на бесплатный урок, в рамках которого рассмотрим как в приложениях на Spring Boot можно работать с Kafka. Узнаем, что предоставляет платформа Spring для ускоренной разработки приложений, работающих с Kafka. Посмотрим, какие есть настройки, как это все конфигурируется. Проведем границу между "родным функционалом" Kafka api и "добавками" от Spring Boot.