Привет! В этой статье мы развернем dev-cluster kafka последней версии (3.7 на момент написания статьи), без использования zookeeper. Также в нашей сборке будет web-ui и все для мониторинга. В представленной конфигурации настроены SASL, SSL, ACL.

В чем полезность статьи? В статье представлен готовый docker-compose.yml для использования, который будет вам полезен если вы не сильно искушены в вопросах администрирования kafka и docker-compose, но уже хотелось бы начать заниматься разработкой, используя кластер kafka. Беглый поиск в интернет не дал результата в виде готового docker-compose.yml для конкретно такой конфигурации, которая потребовалась мне, поэтому было решено выложить полученный результат.

Я не являюсь узкоспециализированным администратором kafka, поэтому если видите явные ошибки, то буду рад исправить.

Сначала будет быстрое пошаговое руководство как запустить эту сборку и начать использовать. А далее будет представлено полное объяснение переменных, используемых для настройки наших компонентов и объяснение некоторых моментов.

Быстрый старт

Начнем с загрузки необходимых файлов с https://github.com/yubazh/kafka-compose

git clone https://github.com/yubazh/kafka-compose

Для начала нам нужно заменить ip адрес 192.168.0.188 в файле kafka-hosts.txt на свой. После этого запускаем генерацию сертификатов скриптом generate.sh (скрипт не мой, в конце будут ссылки на материалы, откуда "позаимствованы" некоторые автоматизации)

./generate.sh

Данный скрипт генерирует все необходимые файлы и размещает их в директориях certificate-authority, keystore, pem, truststore. После этого необходимо заменить ip адрес 192.168.0.188 на свой в файле docker-compose.yml в трех местах:

- KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://:9091,EXTERNAL://192.168.0.188:29093   

Для запуска контейнеров используем:

docker-compose up -d

Ждем пока все успешно поднимется и переходим в http://localhost:8080 для доступа в kafka-ui. Используем admin/admin для входа. Для доступа к grafana переходим на http://localhost:3000.

Для остановки контейнеров без удаления volume используем:

docker-compose down

Для остановки контейнеров с удалением всех volume используем:

docker-compose down -v

Для работы с кластером присутствуют файлы admin.properties и client.properties.da/client.properties.va, содержащие необходимые настройки.

Более подробное разъяснение

Начнем также с загрузки необходимых файлов с https://github.com/yubazh/kafka-compose

git clone https://github.com/yubazh/kafka-compose

Нам нужно заменить ip адрес 192.168.0.188 в файле kafka-hosts.txt на свой. После этого запускаем генерацию сертификатов скриптом generate.sh.

./generate.sh

Данный скрипт генерирует все необходимые файлы для работы ssl и размещает их в директориях certificate-authority, keystore, pem, truststore.

Далее рассмотрим docker-compose.yml посервисно.

kafka-0:

  kafka-0:
    # название контейнера по которому к нему можно обращаться  
    container_name: kafka-0    
    # hostname контейнера
    hostname: kafka-0          
    # образ который мы будем использовать, а именно bitnami kafka v 3.7
    image: docker.io/bitnami/kafka:3.7  
    ports:
      # для подключению к каждому из брокеров будем использовать вывешенный наружу порт 29092/29093/29094
      - "29092:29092"
      # также вывешиваем порты 29096-29097 которые ведут к порту 9404. на этот порт вывешиваются метрики jmx-exporter'a
      - "29095:9404"
    restart: always     
    networks:
      - kafka  
    environment:
      # KAFKA CLUSTER - в данном разделе содержатся переменные для кластеризации кафки
      # у каждой из нод должен быть свой уникальный ID внутри кластера
      - KAFKA_CFG_NODE_ID=0   
      # указываем ID кластера. должен быть идентичен внутри кластера у всех нод
      - KAFKA_KRAFT_CLUSTER_ID=abcdefghijklmnopqrstuv
      # указываем, что каждый брокер будет исполнять роль broker'a и controller'a. контроллер заменяет отсутствующий у нас zookeeper
      - KAFKA_CFG_PROCESS_ROLES=broker,controller
      # перечисляем список нод, которые исполняют роль controller'a и будут собраны вместе для кворума
      - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka-0:9093,1@kafka-1:9093,2@kafka-2:9093 
      # указываем какой пользователь будет обладать правами superuser'a
      - KAFKA_CFG_SUPER_USERS=User:sa
      # LISTENERS - в данном разделе содержатся переменные для настройка способов подключения к брокеру
      # указываем название самой сущности "listener" и порты, на которых брокер будет принимать запросы
      - KAFKA_CFG_LISTENERS=INTERNAL://:9091,EXTERNAL://:29092,CONTROLLER://:9093
      # указываем адрес и порт, по которым клиенты смогут подключаться к кластеру. контроллер здесь не указывается
      - KAFKA_CFG_ADVERTISED_LISTENERS=INTERNAL://:9091,EXTERNAL://192.168.0.188:29092
      # указываем протоколы, по которым будут работать описанные выше сущности "listener"
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=INTERNAL:SASL_SSL,EXTERNAL:SASL_SSL,CONTROLLER:SASL_SSL
      # BROKER SETTINGS - настройки именно broker'a (process_role=broker)
      # указываем, что описанный ранее listener с названием INTERNAL будет использоватся для брокеров
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=INTERNAL
      # указываем пользователя, которого будет использовать broker
      - KAFKA_INTER_BROKER_USER=sa
      # указываем пароль от указанного пользователя для broker'a
      - KAFKA_INTER_BROKER_PASSWORD=000000
      # указываем механизм SASL (механизм для передачи паролей, может быть PLAIN,SCRAM-SHA-256,SCRAM-SHA-512)
      - KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL=PLAIN
      # CONTROLLER SETTINGS - настройки для controller'a (process_role=controller)
      # указываем, что описанный ранее listener с названием CONTROLLER будет использоватся для контроллеров
      - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
      # указываем механизм SASL (механизм для передачи паролей, может быть PLAIN,SCRAM-SHA-256,SCRAM-SHA-512)
      - KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL=PLAIN
      # указываем пользователя, которого будет использовать controller
      - KAFKA_CONTROLLER_USER=sa
      # указываем пароль от указанного пользователя для controller'a
      - KAFKA_CONTROLLER_PASSWORD=000000
      # CLIENT SETTINGS - настройки клиентов, которые будут подключаться из вне
      # указываем, что описанный ранее listener с названием EXTERNAL будет использоватся для клиентов
      - KAFKA_CLIENT_LISTENER_NAME=EXTERNAL
      # перечисляем через запятую создаваемых пользователей при запуске кластера
      - KAFKA_CLIENT_USERS=sa,da,va
      # перечисляем через запятую пароли пользователей, которых указали выше
      - KAFKA_CLIENT_PASSWORDS=000000,111111,222222
      # ACL - список управления доступом
      # для работы ACL нужно обязательно указать authorizer_class_name
      - KAFKA_CFG_AUTHORIZER_CLASS_NAME=org.apache.kafka.metadata.authorizer.StandardAuthorizer
      # запрещаем создание топиков любыми пользователями, если ACL отсутствуют
      - KAFKA_CFG_ALLOW_EVERYONE_IF_NO_ACL_FOUND=false
      # SSL - настройки сертификатов
      # выключаем проверку hostname
      - KAFKA_CFG_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM=
      # пароль от сертификата (зашит по умолчанию в generate.sh)
      - KAFKA_CERTIFICATE_PASSWORD=supersecret
      # указываем тип сертификата (jks или pem)
      - KAFKA_TLS_TYPE=JKS
      # SASL - настройки аутентификации
      # указываем механизм SASL
      - KAFKA_CFG_SASL_ENABLED_MECHANISMS=PLAIN
      # COMMON SETTINGS - общие настройки
      # запрещаем автоматическое создание топиков
      - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE=false
      # дополнительные аргументы, указываемые при запуске. мы используем их для вывешивания метрик jmx-exporter'a
      - EXTRA_ARGS=-javaagent:/opt/jmx-exporter/jmx_prometheus_javaagent-0.19.0.jar=9404:/opt/jmx-exporter/kafka-3_0_0.yml
    volumes:
      # volume с данными kafka
      - kafka_0_data:/bitnami/kafka
      # пробрасываем в контейнер скачанный заранее jmx-exporter
      - ./jmx-exporter:/opt/jmx-exporter
      # пробрасываем в контейнер сгенерированные ранее сертификаты
      - ./keystore/kafka-0.server.keystore.jks:/opt/bitnami/kafka/config/certs/kafka.keystore.jks:ro
      - ./truststore/kafka.truststore.jks:/opt/bitnami/kafka/config/certs/kafka.truststore.jks:ro
    healthcheck:
      # проверка состояния контейнера. проверка происходит по готовности порта 9091
      test: "bash -c 'printf \"\" > /dev/tcp/127.0.0.1/9091; exit $$?;'"
      interval: 5s
      timeout: 10s
      retries: 3
      start_period: 30s

Отдельно стоит остановиться на следующих вещах:

  • мы используем Apache Kafka Raft (KRaft). т.е. мы не используем zookeeper совсем. для использования KRaft необходимо указать process_role=controller, а также заполнить все переменные, связанные с этой ролью. обратите внимание на разделы KAFKA CLUSTER и CONTROLLER SETTINGS в docker-compose.yml. (в документации сказано, что KRaft начиная с версии kafka 3.3 является production-ready.)

  • в документации к bitnami kafka указано, что при запуске кластера в "KRaft mode", KAFKA_CFG_SASL_MECHANISM_CONTROLLER_PROTOCOL поддерживает только механизм PLAIN. поэтому в нашей конфигурации мы используем механизм SASL PLAIN.

  • так как мы используем механизм PLAIN, то нам необходимо создать пользователей при старте кластера. в отличие от SCRAM, в запущенный брокер добавить юзеров мы уже не сможем. для добавления пользователей используем соответствующие переменные в docker-compose.

  • для использования ACL, нужно обязательно указать KAFKA_CFG_AUTHORIZER_CLASS_NAME. нашел единственный верный класс: org.apache.kafka.metadata.authorizer.StandardAuthorizer, который необходимо использовать в данной конфигурации.

  • при описании "внешнего" listener'a (EXTERNAL) обязательно укажите свой внешний ip адрес, по которому будете подключаться к брокерам.

Рассмотрим переменные kafka-ui (это web-ui):

    environment:
      # CLUSTER SETTINGS
      # название кластера в самом веб-интерфейсе. переменная влияет только на отображаемое название
      - KAFKA_CLUSTERS_0_NAME=dev-cluster
      # перечисляем брокеры, из которых состоит наш кластер. также указываем порт для коннекта
      - KAFKA_CLUSTERS_0_BOOTSTRAPSERVERS=kafka-0:9091,kafka-1:9091,kafka-2:9091
      # указываем по какому протоколу будет происходить подключение, к перечисленным брокерам
      - KAFKA_CLUSTERS_0_PROPERTIES_SECURITY_PROTOCOL=SASL_SSL
      # указываем механизм SASL
      - KAFKA_CLUSTERS_0_PROPERTIES_SASL_MECHANISM=PLAIN
      # указываем использование SASL
      - KAFKA_CLUSTERS_0_PROPERTIES_PROTOCOL=SASL
      # описываем конфигурацию SASL, а также логин\пароль юзера для коннекта к брокерам
      - KAFKA_CLUSTERS_0_PROPERTIES_SASL_JAAS_CONFIG=org.apache.kafka.common.security.plain.PlainLoginModule required username="sa" password="000000";
      # указываем, что при изменении файла конфигурации kafka-ui, он будет автоматически сразу "перечитан" и применен
      - DYNAMIC_CONFIG_ENABLED=true
      # AUTH
      # указываем что необходимо использовать форму для логина (зайти без логин\пасс уже не получится)
      - AUTH_TYPE=LOGIN_FORM
      # login от kafka-ui
      - SPRING_SECURITY_USER_NAME=admin
      # pass от kafka-ui
      - SPRING_SECURITY_USER_PASSWORD=admin
      # SSL - указываем truststore, который будет использоваться при подключении к брокерам, а также пароль от truststore
      - KAFKA_CLUSTERS_0_SSL_TRUSTSTORELOCATION=/kafka.truststore.jks
      - KAFKA_CLUSTERS_0_SSL_TRUSTSTOREPASSWORD=supersecret

Рассмотрим kafka-exporter. Это экспортер который вывешивает метрики kafka. Нас интересует только раздел command:

# перечисляем брокеры:
--kafka.server=kafka-0:9091 --kafka.server=kafka-1:9091 --kafka.server=kafka-2:9091 
# указываем, что используем SASL, указываем конкретный механизм, а также логин и пароль
--sasl.enabled --sasl.mechanism=PLAIN --sasl.username=sa --sasl.password=000000 
# указываем использование trustore при подключении к брокерам, а также отключаем проверку hostname
--tls.enabled --tls.insecure-skip-tls-verify --tls.ca-file=/kafka.truststore.jks 
# уровень логирования
--log.level=debug

В контейнере prometheus нас интересует только передача содержимого директории prometheus в /etc/prometheus. Она содержит prometheus.yml, который в свою очередь содержит scrape_config. В нем указано откуда нужно собирать метрики.

Рассмотрим контейнер с графаной:

    environment:
      # AUTH 
      # указываем логин и пароль от пользователя admin
      - GF_SECURITY_ADMIN_USER=admin
      - GF_SECURITY_ADMIN_PASSWORD=admin
      # также указываем, что можно работать с графаной без аутентификации
      - GF_AUTH_ANONYMOUS_ENABLED=true
    volumes:
      # здесь расположены файлы с настройкой графаны, а именно конкретного datasource (указываем на поднятный ранее prometheus), а также указываем автоматическую загрузку дашбордов из файлов
      - ./grafana/provisioning:/etc/grafana/provisioning
      # передаем в контейнер json'ы дашбордов
      - ./grafana/dashboards:/var/lib/grafana/dashboards

Для запуска контейнеров используем:

docker-compose up -d

Ждем пока все компоненты запустятся. В логах kafka-[0:2] мы можем посмотреть загруженные параметры в разделе "INFO KafkaConfig values:". Вы его не пропустите в логе.

Далее можем перейти на http://localhost:8080 и визуально оценить работает ли всё корректно. На что обратить внимание: на главной странице должно быть корректно отображено название кластера, которое мы передали в переменной в kafka-ui. Также должна быть корректна отражена версия кластера (в нашем случае 3.7-IV4). Далее, если ACL корректно включён, то в списке слева обязательно должна быть ссылка на "ACL"

kafka-ui
kafka-ui

Также можно посетить графану на http://localhost:3000. Выбираем "3 палочки" (как его в некоторых меню называют - гамбургер) в верхнем левом углу и переходим в Dashboards:

grafana
grafana

Выбираем нужный дашборд и смотрим. Данные появятся после начала работы с кафкой (создания топиков, записи в них и так далее). Пока что всё пусто.

Для проверки работы кластера можно использовать kafka в запущенных контейнерах, но мне удобнее было установить утилиты на свою машину. Воспользуемся руководством quickstart на оф сайте kafka https://kafka.apache.org/quickstart:

wget https://dlcdn.apache.org/kafka/3.7.0/kafka_2.13-3.7.0.tgz
tar -xzf kafka_2.13-3.7.0.tgz
rm kafka_2.13-3.7.0.tgz
cd kafka_2.13-3.7.0/bin

Для удобства поместите в данную директорию (kafka_2.13-3.7.0/bin) файлы admin.properties и client.properties.da из репозитория. Эти файлы содержат необходимые для подключения параметры. Пройдемся по содержимум admin.properties (обратите внимание на 7ую строчку, вам нужно путь к своему сгенерированному ранее truststore файлу):

# укажем механизм SASL
sasl.mechanism=PLAIN
# укажем настройки jaas.config, которые используются при подключении, а также логин и пароль юзера, который будет подключаться
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="sa" password="000000";
# используем протокол SASL_SSL
security.protocol=SASL_SSL
# обязательно укажите truststore
ssl.truststore.location=/home/user/kafka/kafka-compose/truststore/kafka.truststore.jks
# пароль от kafka.truststore.jks
ssl.truststore.password=supersecret
# этот параметр отключает сравнение hostname, указанного в сертификате, с хостнеймом брокера. 
# если коннектитесь через ip адрес своей машины, 
# то без указания этой переменной могут сыпаться ошибки. обратите на это внимание
ssl.endpoint.identification.algorithm=

Далее выполним некоторые команды для проверки работы кластера (замените айпи адрес на свой). Обратите внимание что используем для работы admin.properties (админский конфиг):

# создаем топик test-topic с тремя партициями и фактором репликации равным трем
./kafka-topics.sh --bootstrap-server 192.168.0.188:29092 --create --topic test-topic --partitions 3 --replication-factor 3 --command-config admin.properties

# добавляем юзеру "da" возможность работать с топиком test-topic, выполнять любые операции
./kafka-acls.sh --bootstrap-server 192.168.0.188:29092 --add --allow-principal User:da --operation All --group '*' --topic test-topic --command-config admin.properties

# проверяем ACL для топика test-topic
./kafka-acls.sh --bootstrap-server 192.168.0.188:29092 --list --topic test-topic --command-config admin.properties

Вывод будет такой:

Далее откроем 2 консоли, в одной запустим producer, а во второй consumer, и увидим что все работает верно. И producer и consumer будем запускать от пользователя da (т.е. с конфигом client.properties.da)

# стартуем producer
./kafka-console-producer.sh --bootstrap-server 192.168.0.188:29092 --topic test-topic --producer.config client.properties.da
# после этого можете в командной строке ввести сообщения

# стартуем consumer
./kafka-console-consumer.sh --bootstrap-server 192.168.0.188:29092 --topic test-topic --consumer.config client.properties.da  --from-beginning
# выведем все введенные ранее сообщения 

Должно получиться так:

После этого можно будет проверить дашборды еще раз. Графики должны обновиться.

Для хранения информации kafka и prometheus использует docker volume. Поэтому если вы хотите остановить контейнеры без потери информации, используйте следующую команду в директории с docker-compose.yml:

docker-compose down

Если же вы захотите остановить все контейнеры и удалить volume с информацией, то используйте команду:

docker-compose down -v

При подготовке статьи использовались следующие материалы:

  1. https://jaehyeon.me/blog/2023-07-20-kafka-development-with-docker-part-11/
    это блог дата инженера. блог очень интересный. именно данный цикл статей содержит информацию по поднятию кластера кафки через docker-compose. все исходники выложены на github автора. именно отсюда взят generate.sh скрипт. минус сборки автра - очень старая версия кафки. произошли некоторые изменения, как в kafka, так и именно в bitnami контейнерах. в данном блоге есть код producer и consumer на python

  2. https://medium.com/@penkov.vladimir/kafka-cluster-with-ui-and-metrics-easy-setup-d12d1b94eccf
    описание сборки кафки с мониторингом, используя docker-compose @papirosko

  3. https://github.com/bitnami/containers/blob/main/bitnami/kafka/README.md
    официальный репо bitnami kafka image в README.md которого есть хорошая документация к образам

Комментарии (5)


  1. vesper-bot
    24.04.2024 10:07

    А зачем в docker-compose многоузловая кафка? Она нужна в кубе, а не в докере, потому что докер на одном узле, а куб на нескольких. Поэтому никто и не делал.


    1. yubazh Автор
      24.04.2024 10:07
      +2

      это дев-кластер, поэтому "многоузловость" не для отказоустойчивости, а для того чтобы поработать с ней. не у каждого разработчика есть желание поднимать для начала разработки дома кластер куба, да даже minikube/k3s. не у всех в инфре есть куб для подобных целей. все зависит от целей, задач и условий. была цель "пощупать" кластерную кафку.


  1. fujinon
    24.04.2024 10:07

    А теперь в качестве упражнения сделайте все это в одном контейнере.


  1. papirosko
    24.04.2024 10:07
    +1

    О, моя статья в списке использованной литературы. Ну хоть не зря писал :)


    1. yubazh Автор
      24.04.2024 10:07

      та которая на медиум?) добавил ссылку