Мы расскажем, как настроить безопасность кластеров Kafka и Zookeeper, какие инструменты можно использовать для мониторинга и управления кластером, а также про особенности продукта, с которыми мы столкнулись.

Почему Apache Kafka?

 Apache Kafka — это унификация. Десятки поставщиков и потребителей, миллионы сообщений в день и огромные массивы данных — для этого нужна надежная, отказоустойчивая и высокопроизводительная шина данных. Существует множество версий дистрибутивов Apache Kafka, например vanilla kafka, oбразы от confluent, bitnami, wurstmeister и т.д. Мы расскажем про решение на базе сборок от Confluent в виде Docker-образов. Оно самое надежное в плане информационной безопасности. Конфигурирование и запуск контейнеров с Kafka в данном случае происходит с помощью docker-compose.

 За время работы с Kafka мы прошли  путь от «коробочного» решения до тонкой настройки безопасности кластера, применения TLS шифрования и разворачивания по рекомендациям вендора по построению геораспределенного кластера. В конфигурации «из коробки» нет первоначальных настроек безопасности. Для контроля доступов, вносимых изменений и безопасного взаимодействия перед нами встала задача настройки на кластере Apache Kafka TLS шифрования, аутентификации и авторизации средствами встроенного ACL.

TLS шифрование

Обеспечивает шифрование соединения между брокерами Kafka, серверами Zookeeper, клиентами и брокерами. Вся ключевая информация помещается в хранилища — keystore, которые разделяют на два вида:

  • keystore, где хранятся ключи и сертификаты стороны, в отношении которой пройдет процедура аутентификации и установления защищенного соединения, например, приватный ключ и ассоциированный с ним и подписанный со стороны центра сертификации (ЦC) сертификат.

  • truststore, куда помещаются сертификаты, которым доверяет носитель хранилища.

 Общий процесс настройки TLS выглядит так:

  1. Для каждого сервера брокера и Zookeeper создаем keystore (настройки конфигурации см. в следующей главе):

    • Создаем keystore + генерируем ключевую пару — закрытый ключ и соответствующую цепь сертификатов (certificate chain), в которой после выполнения команды будет самоподписанный сертификат, содержащий его открытый ключ.

    • Создаем запрос на подпись для ЦC.

    • Подписываем запрос закрытым ключом ЦC. В нашем случае есть промежуточный ЦC и корневой ЦC.

    • Подписанный запрос импортируем в созданный ранее keystore, в цепочку сертификатов закрытого ключа. Сначала импортируем сертификаты всех ЦС, подписавших наш запрос, и только потом сам подписанный запрос.  Это необходимо для точного установления цепи сертификатов, которая производится утилитой keytool, иначе получим ошибку. 

    • Производим настройки конфигурации брокеров, серверов Zookeeper и клиентов.

В итоге на брокере и серверах Zookeeper мы получим keystore с 3 ключевыми объектами:

  • сертификат промежуточного ЦC, подписанный корневым ЦC;

  • самоподписанный сертификат корневого ЦC;

  • приватный ключ сервера со связанной цепью сертификатов: серверный сертификат, подписанный промежуточным ЦC;

  • сертификат промежуточного ЦC, подписанный корневым ЦC и самоподписанный сертификат корневого ЦC.

  1. Создаем truststore для клиентов и брокеров (для взаимодействия в качестве клиента с хостами Zookeeper) и в него импортируем сертификаты ЦC, чьим подписям клиент должен доверять.

    Здесь односторонняя аутентификация, т.е. клиент проверяет брокера и Zookeeper серверы брокера. Если нужно настроить двухстороннюю аутентификацию, проверку клиента или Zookeeper серверы на брокере, проделываем те же операции, что и на брокере, по созданию keystore на клиенте, а на Zookeeper создаем соответствующий truststore.

Важно отметить, что при создании клиентов и при использовании библиотеки librdkafka сертификаты доверенных ЦС могут браться клиентом непосредственно из  операционной системы (ОС). К примеру, для машин на Windows необходимо на клиенте задавать настройку:

ssl.ca.certificate.stores=CA,Root

По умолчанию предполагается только Root и если сертификат сервера подписан еще и промежуточным ЦС, клиент не сможет установить цепь доверия и получит ошибку, поэтому нужно указывать CA.

В дистрибутиве Apache Kafka от Confluent предусмотрена директория для размещения файлов, необходимых для настройки TLS. Внутри контейнера это /etc/kafka/secrets. Для размещения файлов необходимо в конфигурационном файле деплоя образов прописать признак монтирования данной директории.

Далее рассмотрим настройку брокера и сервера Apache Zookeeper в составе кластера Apache Kafka. На остальных представителях кластера конфигурация аналогичная.

Настройка keystore

Создание Keystore на стороне серверов кластера

На сервере должна быть установлена Java, также можно выполнить команды внутри Docker контейнера с дистрибутивом Apache Kafka от Confluent:

keytool -genkeypair -validity days -keyalg RSA -keystore server1.keystore.p12 -storetype pkcs12 -alias server1 -dname "CN=fqdn,OU=x,O=x,L=Moscow,ST=Moscow,C=RU" -ext SAN=DNS:fqdn

Где days – срок действия сертификата в днях, fqdn – доменное имя хоста.

Таким образом, создан keystore с названием "server1.keystore.p12" с типом pkcs12, псевдонимом server1 пары приватный\публичный ключ сервера.

Если установлена версия Java 9 и выше, тип -storetype pkcs12 можно не указывать, т.к. он задан по умолчанию. Далее нужно подписать сертификат сервера в данном keystore.

Создание запроса на подпись сертификата

keytool -certreq -keystore server1.keystore.p12 -alias server1 -file server1.csr

Запрос необходимо подписать на стороне ЦС.

В keystore добавляем сертификаты ЦС (как промежуточного, так и корневого) и импортируем цепь сертификатов, полученную ранее. Тем самым мы замещаем самоподписанный сертификат сервера, указывая при этом alias, заданный при создании keystore. 

keytool -importcert -file root-ca.crt -keystore server1.keystore.p12 -alias RootCA

keytool -importcert -file subroot-ca.crt -keystore server1.keystore.p12 -alias SubCA

keytool -importcert -file server1.pem -keystore server1.keystore.p12 -alias server1

Для проверки успешности импорта выводим содержимое keystore командой:

keytool -list -v -keystore server1.keystore.p12

Keystore готов к работе.

Создание truststore

В truststore помещаем сертификаты ЦС, подписавших сертификат сервера, чтобы сторона взаимодействия могла удостовериться, что представленный сертификат подписан доверенным ЦС.

keytool -importcert -file root-ca.crt -keystore client.truststore.p12 -alias RootCA

keytool -importcert -file subroot-ca.crt -keystore client.truststore.p12 -alias SubCA

keytool -list -v -keystore client.truststore.p12

Как уже упоминалось ранее, при построении клиента с помощью библиотеки librdkafka будут использоваться доверенные сертификаты ОС.

Настройки конфигурации сервера

Kafka поддерживает протокол SASL для аутентификации. При этом он может использоваться наряду с шифрованием – SASL_SSL, либо без него – SASL_PLAINTEXT. SASL позволяет применять разные механизмы аутентификации: PLAIN, SCRAM, OAUTHBEARER, Kerberos. С ними можно ознакомиться по ссылке.

Конфигурация брокера

Рассмотрены необходимые корректировки в конфигурацию и параметры для запуска кластера Apache Kafka в виде контейнеризированного решения от Сonfluent с использованием docker-compose. Особенности преобразования конфигурационных параметров Kafka для docker-compose файлов деплоя — на официальном сайте Confluent. Параметры для брокеров предваряются - «KAFKA_», для Zookeeper - «ZOOKEEPER_».

Задаем параметр взаимодействия между брокерами через протокол SASL_SSL:

KAFKA_INTER_BROKER_LISTENER_NAME: SASL_SSL

Параметр взаимодействия с клиентами через протокол SASL_SSL:

KAFKA_SECURITY_PROTOCOL: SASL_SSL

Необходимо добавить порт, к примеру, 9096 для приема подключений от клиентов с использованием TLS: SASL_SSL

KAFKA_LISTENERS: SASL_SSL://server1.domain.ru:9096

KAFKA_ADVERTISED_LISTENERS: SASL_SSL://server1.domain.ru:9096

TLS шифрование Zookeeper предполагает настройку клиентских портов для подключения, о чем расскажем далее, поэтому указываем на брокерах эти порты, к примеру, 2281. Для кластера Zookeeper из 3 хостов.

KAFKA_ZOOKEEPER_CONNECT: zookeeper_server1.domain.ru:2281, zookeeper_server2.domain.ru:2281, zookeeper_server3.domain.ru:2281

Задаем относительный путь до файла keystore (как уже упоминалось ранее, для confluent стандартный рабочий каталог для настроек TLS - /etc/kafka/secrets), файла с паролем (server1.creds) от keystore и пароля закрытого ключа, который совпадает с паролем от keystore. И аналогичные настройки для файла truststore.

KAFKA_SSL_KEYSTORE_FILENAME: server1.keystore.p12

KAFKA_SSL_KEYSTORE_CREDENTIALS: server1.creds

KAFKA_SSL_KEY_CREDENTIALS: server1.creds

KAFKA_SSL_TRUSTSTORE_LOCATION: /etc/kafka/secrets/client.truststore.p12

KAFKA_SSL_TRUSTSTORE_PASSWORD:pass
Активация TLS шифрования для взаимодействия брокеров с кластером Zookeeper:

KAFKA_ZOOKEEPER_SSL_CLIENT_ENABLE: 'true' 

TLS не поддерживается фрэймворком Zookeeper NIO (используется по умолчанию), поэтому переопределяем его на Netty:

KAFKA_ZOOKEEPER_CLIENT_CNXN_SOCKET: 'org.apache.zookeeper.ClientCnxnSocketNetty'

Нужно задать расположение паролей keystore и truststore для взаимодействия с Zookeeper. Примечательно, для данных параметров нужно указать абсолютный путь внутри контейнера, т.к. стандартное расположение «/etc/kafka/secrets» автоматически не учитывается:

KAFKA_ZOOKEEPER_SSL_KEYSTORE_LOCATION: /etc/kafka/secrets/server1.keystore.p12

KAFKA_ZOOKEEPER_SSL_KEYSTORE_PASSWORD: pass

KAFKA_ZOOKEEPER_SSL_TRUSTSTORE_LOCATION: /etc/kafka/secrets/client.truststore.p12

KAFKA_ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD: pass

Конфигурация Zookeeper сервера

Взаимодействие между серверами кластера Zookeeper предлагается осуществлять с использованием TLS:

ZOOKEEPER_SSL_QUORUM: 'true'

Как отметили ранее, для активации TLS необходимо переопределить используемый Zookeeper серверами фрэймворк, а также задать keystore, truststore для взаимодействия кворума:

ZOOKEEPER_SERVER_CNXN_FACTORY: 'org.apache.zookeeper.server.NettyServerCnxnFactory'

ZOOKEEPER_SSL_QUORUM_KEYSTORE_LOCATION: /etc/zookeeper/secrets/zookeeper_server1.keystore.p12

ZOOKEEPER_SSL_QUORUM_KEYSTORE_PASSWORD: pass

ZOOKEEPER_SSL_QUORUM_TRUSTSTORE_LOCATION: /etc/zookeeper/secrets/client.truststore.p12

ZOOKEEPER_SSL_QUORUM_TRUSTSTORE_PASSWORD: pass

Поднимаем порт, принимающий подключения с использованием TLS: 

ZOOKEEPER_SECURE_CLIENT_PORT: 2281

Задаем keystore и truststore для взаимодействия с клиентами:

ZOOKEEPER_AUTH_PROVIDER_X509: 'org.apache.zookeeper.server.auth.X509AuthenticationProvider' 

ZOOKEEPER_SSL_KEYSTORE_LOCATION: /etc/zookeeper/secrets/server1.keystore.jks

ZOOKEEPER_SSL_KEYSTORE_PASSWORD:

ZOOKEEPER_SSL_TRUSTSTORE_LOCATION: /etc/zookeeper/secrets/client.truststore.jks

ZOOKEEPER_SSL_TRUSTSTORE_PASSWORD:

Для настройки TLS использовалась утилита keytool.

При настройке keystore, файлов и директорий, необходимых внутри контейнера, нужно учитывать, что в версиях дистрибутива от 5.5.х Kafka и Zookeeper от Confluent работа внутри контейнера происходит от пользователя “appuser”, а не от “root”, как это было раньше, и необходимо выдавать соответствующие права на файлы на уровне системы хоста. Для хостовой системы пользователь appuser представлен UID 1000 и GID 1000.

 В сборке Kafka и Zookeeper от Confluent версии 5.0.x установлена Java 8, начиная с версии 6.0.x и в текущей 7.0.x — Java 11. Соответствие версий дистрибутивов Apache Kafka и Confluent приведены на сайте. Данный аспект обуславливает использование ключей утилиты keytool.

К примеру, в официальной документации для создания keystore используется ключ “–genkey”. Как гласит документация, данный ключ устарел и переименован в “–genkeypair”, начиная с версии Java 6, а «–import» переименован в «–importcert». При использовании “–genkey” без указания «-storetype pkcs12» в версиях до Java 9 созданный keystore будет использовать признанный устаревшим формат JKS. Исходя из этого, лучше использовать новые ключи для формирования keystore.

 Т.к. для подписи сертификата использовался промежуточный ЦС, то помимо сертификата корневого ЦС нужно добавить в keystore еще и сертификат промежуточного ЦС.

В зависимости от политики и правил подписания сертификатов в вашей компании могут накладываться ограничения, к примеру, на область действия сертификата или полей расширения. Мы столкнулись с тем, что у сертификатов после подписания была только возможность проверки подлинности сервера (serverAuth), а поле SAN необходимо было дополнительно задавать. При попытке запустить кластер на сертификатах с данным ограничением возникает ошибка.

javax.net.ssl|ERROR|19|/0.0.0.0:3888|2021-10-27 11:18:22.090 GMT|TransportContext.java:341|Fatal (CERTIFICATE_UNKNOWN): Extended key usage does not permit use for
TLS client authentication (
"throwable" : {
  sun.security.validator.ValidatorException: Extended key usage does not permit use for TLS client authentication

В виду того, что сервер может выступать клиентом при взаимодействиях в кластере, нужно еще добавить возможность проверки подлинности клиента (clientAuth). Данный аспект появился в официальной документации Apache Kafka, начиная с версии 2.6.

Иногда стоит увеличить уровень логирования для более полного понимания картины при настройке и запуске, поэтому для получения более детальных логов в конфигурационном файле деплоя требуемого контейнера в параметр «KAFKA_OPTS» прописываем:

KAFKA_OPTS: “-Djavax.net.debug=all”

Аутентификация 

Основные механизмы, поддерживаемые Kafka и предусмотренные протоколом SASL, для аутентификации по имени пользователя и паролю – PLAIN и SCRAM. PLAIN подразумевает передачу паролей в открытом виде, поэтому требует организации шифрования канала при реализации в промышленном контуре. В SCRAM пароли без криптографического преобразования не передаются, что повышает уровень защищенности, необходимость использования шифрования остается. В отличие от PLAIN, где пароли всех подключающихся пользователей хранятся в файлах JAAS на брокере, в SCRAM они записываются в хэшированном виде на серверы Zookeeper. Таким образом использование SCRAM добавляет бОльшие чем в PLAIN временные издержки на установление соединения, но в ходе дальнейшей передачи данных они нивелируются.

Рассмотрим настройку SASL PLAIN. Напомним, что при использовании SASL аутентификация клиента на уровне TLS не производится. В процессе настройки для каждого сервера конфигурируется JAAS файл с учетными данными и указанием способа аутентификации. При использовании SASL поверх SSL на брокерах мы указываем расположение JAAS файла внутри контейнера:

KAFKA_OPTS: “-Djava.security.auth.login.config=/etc/kafka/kafka_server_jaas.conf”

Внутри файла указываются учетные данные и механизм аутентификации внутри обособленных блоков. Блок описывает взаимодействие: к примеру, блок «KafkaServer» отражает данные для взаимодействия клиентов с брокерами. Блок «Client» описывает, под каким пользователем брокер обращается к кластеру Zookeeper. Пример JAAS файла:    

KafkaServer {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="KafkAdmin"
    password=""
    user_KafkaAdmin ="password"
    user_User1="password";
};

Client {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="ZooKafka"
    password="password";
};

 На стороне Zookeeper при активации SASL задаются следующие параметры:

KAFKA_OPTS: "-Djava.security.auth.login.config=/etc/kafka/zookeeper_server_jaas.conf -Dzookeeper.authProvider.1=org.apache.zookeeper.server.auth.SASLAuthenticationProvider -Dzookeeper.requireClientAuthScheme=sasl

ZOOKEEPER_JAASLOGINRENEW: 3600000

В файле jaas указывается блок «Server», где описываются: пользователь, под которым взаимодействуют сами хосты Zookeeper, и пользователь, под которым брокеры обращаются к Zookeeper:

Server {
    org.apache.kafka.common.security.plain.PlainLoginModule required
    username="ZooAdmin"
    password="pass"
    user_ZooKafka="pass";
};

При включении SASL на Zookeeper на созданные до этого Znode не распространяются применяемые правила, поэтому нужно выполнять zookeeper-security-migration.sh.

Авторизация

Для целей авторизации настроен ACL.

На стороне брокеров нужно выполнить ряд настроек:

Подключить плагин:

KAFKA_AUTHORIZER_CLASS_NAME: kafka.security.authorizer.AclAuthorizer

Для ограничения возможности записи метаданных в Zookeeper задаем следующий параметр:

KAFKA_ZOOKEEPER_SET_ACL: 'true'

Настраивается суперпользователь, от которого брокеры будут работать в кластере:

KAFKA_SUPER_USERS: User:KafkAdmin

JAAS файлы уже были представлены ранее как для брокеров, так и для Zookeeper — пользователям в них и задаются права ACL.

Всем приложениям, осуществляющим запись (producer) и чтение (consumer), заданы отдельные пользователи с разграничением минимально необходимых прав на манипуляции в кластере. Сonsumer в рамках приложений выделяются в группы (consumer group), пользователям выдаются права на чтение из своих групп.

Основные команды приведены по ссылке.

Инструменты

Здесь мы расскажем об инструментах для мониторинга и управления кластером Apache Kafka, которыми мы пользовались.

Prometheus JMX exporter

Это стандарт для мониторинга систем. Он использует полностью открытый исходный код, содержит десятки разных экспортеров, с помощью которых быстро настраивается мониторинг инфраструктуры. Более подробно об этом можно почитать тут.

Для установки Prometheus JMX exporter нужно создать директорию для хранения его исполняемых и конфиг файлов:

mkdir /app/prometheus

Prometheus получает метрики с серверов Kafka за счет JMX exporter агента устанавливаемого на узлы кластера.

Скачиваем последнюю версию JMX exporter и настраиваем конфигурацию для работы на каждом брокере Kafka, например:

wget https://repo1.maven.org/maven2/io/prometheus/jmx/jmx_prometheus_javaagent/0.16.1/jmx_prometheus_javaagent-0.16.1.jar

Загружаем конфигурационный файл последней версии для брокеров kafka:

wget https://raw.githubusercontent.com/prometheus/jmx_exporter/master/example_configs/kafka-2_0_0.yml

Добавляем в .yml файл развертывания кластера kafka настройки для каждого брокера:

1)               KAFKA_OPTS: "-javaagent:/etc/kafka/prometheus/jmx_prometheus_javaagent-0.16.1.jar=8088:/etc/kafka/prometheus/kafka-2_0_0.yml"

Где:

/etc/kafka/prometheus/jmx_prometheus_javaagent-0.16.1.jar –путь внутри контейнера до jar файла

8088 – порт, на котором будет слушаться JMXexporter (если кластер развернут на одном сервере, необходимо указывать разные порты для каждого брокера!)

/etc/kafka/prometheus/kafka-2_0_0.yml – путь внутри контейнера до конфигурационного файла.

 Возможные ошибки, возникающие при использовании параметра -javaagent в блоке KAFKA_OPTS:

При использовании CLI kafka внутри контейнера возможно возникновение ошибок вида:

Exception in thread "main" java.lang.reflect.InvocationTargetException
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at sun.instrument.InstrumentationImpl.loadClassAndStartAgent(InstrumentationImpl.java:386)
        at sun.instrument.InstrumentationImpl.loadClassAndCallPremain(InstrumentationImpl.java:401)
Caused by: java.net.BindException: Address already in use
        at sun.nio.ch.Net.bind0(Native Method)
        at sun.nio.ch.Net.bind(Net.java:433)
        at sun.nio.ch.Net.bind(Net.java:425)
        at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:223)
        at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74)
        at sun.net.httpserver.ServerImpl.(ServerImpl.java:100)
        at sun.net.httpserver.HttpServerImpl.(HttpServerImpl.java:50)
        at sun.net.httpserver.DefaultHttpServerProvider.createHttpServer(DefaultHttpServerProvider.java:35)
        at com.sun.net.httpserver.HttpServer.create(HttpServer.java:130)
        at io.prometheus.jmx.shaded.io.prometheus.client.exporter.HTTPServer.(HTTPServer.java:190)
        at io.prometheus.jmx.shaded.io.prometheus.jmx.JavaAgent.premain(JavaAgent.java:31)
        ... 6 more
FATAL ERROR in native method: processing of -javaagent failed
Aborted (core dumped)

Чтобы ошибка не возникала, нужно перед выполнением команд в CLI Kafka единоразово на всю сессию выполнить команду:

unset KAFKA_OPTS

 2) volumes:

      - /app/prometheus:/etc/kafka/prometheus

Где:

/app/docker/docker-images/prometheus – реальный путь на сервере

/etc/kafka/prometheus - путь внутри контейнера

Выполняем передеплой контейнеров kafka, используя команду:

sudo docker-compose -f /путь/к/файлу.yml up –d

Выполнить проверку генерации метрик JMXexporter-ом на хостах и портах указанных в настройках:

curl хост:8088

В конфиг – файле prometheus.yml на сервере с prometheus в блоке «scrape_configs» необходимо добавить строки вида:

- job_name: "kafka_cluster"

static_configs:

- targets:

- server1:8088

- server2:8088

- server3:8088

Grafana

Для визуализации получаемых метрик можно использовать Grafana, в ней удобно настроить свои dashboards или скачать готовые решения. Для сбора метрик нужно установить и настроить сборщик Prometheus. В качестве альтернативы можно использовать Zabbix, он прекрасно работает с JMX метриками и может брать их напрямую с хоста экспортера без Prometheus.

Instana

Для мониторинга кластеров Kafka можно использовать систему Instana с автодискаверингом. При работе с ней были выявлены некоторые ньюасы, о которых мы расскажем ниже.

Помимо штатных показателей производительности кластера kafka и встроенных детекторов аномального состояния кластера, у Instana есть очень полезная функция — отображение Consumer Groups Lag Per Topic. Она подсчитывает лаги Consumer групп в разрезе топиков, визуализирует данные в графическом виде, позволяет взглянуть на них в реальном времени или во временных интервалах. В качестве альтернативы отображения Consumer Groups Lag можно использовать решение от linkedin - Burrow, он имеет встроенный Prometheus exporter (начиная с версии 1.3.4) и хорошо визуализирует лаги работая вкупе со стеком Prometheus + Grafana.

Техподдержка Instana утверждает следующее: «We use the Kafka broker API in order to calculate lag as a diff between last committed and last consumed offset for specific consumer group and topic».

Для отображения Consumer Groups Lag Per Topic агентом Instana используется следующая команда вида:

kafka-consumer-groups --describe --all-groups --bootstrap-server localhost:9092

Внимательные читатели могли заметить, что в кластере kafka с настроенными ACL листами выполнение CLI команды (на примере кластера на основе дистрибутива от confluent) docker exec -it container_name kafka-consumer-groups --describe --all-groups --bootstrap-server kafka1.test:9092 совершенно справедливо будет возвращать пустой результат или выдавать ошибку, причин такому поведению несколько:

  1. команда по умолчанию обращается к кластеру Kafka по протоколу PLAINTEXT, соответственно, она работает исключительно при настройке кластера на работу с данным протоколом (параметр KAFKA_ADVERTISED_LISTENERS);

  2. при настроенном параметре KAFKA_ADVERTISED_LISTENERS PLAINTEXT:// при запросе будет выдаваться пустой результат, т.к. на кластере включен ACL, а по протоколу PLAINTEXT, как мы рассматривали выше, осуществляется анонимный доступ, без указания учетных данных;

  3. при отсутствии настроенного параметра KAFKA_ADVERTISED_LISTENERS PLAINTEXT://  возникнет ошибка Error: Executing consumer group command failed due to org.apache.kafka.common.KafkaException: Failed to find brokers to send ListGroups;

  4. для обращения по другим протоколам нужно использовать команду вида:
    docker exec -it container_name kafka-consumer-groups --describe --all-groups --bootstrap-server kafka1.test:9096 --command-config /путь/до/jaas-конфиг.conf;

где jaas-конфиг.conf это файл со строками подключения по протоколу SASL_PLAINTEXT или SASL_SSL и логином/паролем с необходимыми правами.

Примеры таких конфигов:

Конфиг для работы по протоколу SASL_PLAINTEXT:

security.protocol=SASL_PLAINTEXT
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required
username="user"
password="password";

Конфиг для работы по протоколу SASL_SSL:

security.protocol=SASL_SSL
sasl.mechanism=PLAIN
ssl.truststore.location=/etc/kafka/secrets/kafka.truststore.jks
ssl.truststore.password=password
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required
username="user"
password="password";

Нужно обратить внимание на использование отличных от задаваемых по умолчанию (9092) портов, если путь до server.properties брокеров не прописан в конфигурации агента Instana, он будет искать конфиг файлы в следующих путях:

  • Аргументы процесса брокера Kafka.

  • Переменная окружения KAFKA_SERVER_PROPERTIES

/path_to_kafka_home/config/server.properties

/path_to_kafka_home/etc/kafka/server.properties

Агент Instana использует путь /opt/kafka/config/server.properties по умолчанию, если путь к server.properties не удалось найти ни одним из вышеупомянутых способов.

Не найдя конфиг файлы по данным путям, агент Instana будет стучаться до кластера Kafka по порту по умолчанию – 9092, что приведет к многочисленным ошибкам подключения к кластеру Kafka и отсутствию некоторых данных в вэб консоли Instana.

Немного о возникших проблемах при использовании Instana. Механизм подсчета Consumer Groups Lag Per Topic иногда сбоит и начинает показывать значение, кратное количеству брокеров, т.е. при реальном лаге в 300 сообщений, на 3х брокерной конфигурации возможно отображение лага в 1500 сообщений. Также данную фичу сейчас нельзя использовать на кластерах с настроенными ACL, т.к. отдельной настройки под механизм Consumer Groups Lag Per Topic в Instana нет. Мы завели в компании Instana Feature Request для добавления функционала тонкой настройки механизма Consumer Groups Lag Per Topic и ожидаем его добавления в ближайшем будущем.

Kafka Cruise Control + Cruise Control UI

Kafka Cruise Control + Cruise Control UI. Он позволяет производить:

  • мониторинг состояния брокеров, топиков и партиций;

  • мониторинг нагрузки и утилизации ресурсов кластера;

  • продвинутую балансировку нагрузки;

  • детектирование аномалий и автохилинг;

  • автоматизацию добавления/удаления брокеров.

Kafka GitOps

Еще один незаменимый инструмент — Kafka GitOps. Позволяет автоматизировать управление топиками и списками управления доступом (ACL) Apache Kafka.

Топики и ACL определяются и ведутся в файле YAML формата. Идея проекта — управление топиками и ACL Kafka через версионные файлы конфигурации. При запуске Kafka GitOps сравнивает желаемое состояние кластера Kafka с фактическим и создает план для выполнения в кластере. После выполнения топики и ACL будут соответствовать желаемому состоянию.

Kafka GitOps также создает необходимые ACL для каждого типа приложений. Нет необходимости вручную создавать кучу списков ACL для Kafka Connect, Kafka Streams и т. д.

Работа с консолью:

По дефолту приложение ищет файл state.yaml в текущей директории. Если конфиг лежит в той же директории, что и jar-файл, то параметр -f при вызове указывать не нужно.

Примеры стандартных команд запуска приложения из консоли:

java -jar kafka-gitops-all.jar -f /kafka/statefile/topics.yaml

java -jar kafka-gitops-all.jar

Ниже приведены аргументы (доступны по параметру -h)

  -f, --file=   расположение файла с конфигом
--no-delete     только изменять и добавлять, не удалять топики\ACL-листы
--skip-acls     не анализировать ACL-листы из конфига, настраивать только топики Kafka
-v, --verbose       детальный вывод в консоль
-o     расположение, куда записать получившийся план (если мы хотим только сформировать план, не применяя изменения к кластеру Kafka)

Команды:

 apply     применить конфиг к Kafka
plan      только построить план изменений, не применяя к Kafka
validate  валидировать конфиг на предмет наличия\отсутствия ошибок в state.yaml файле

Пример вызова приложения:

java -jar kafka-gitops-all.jar -f /kafka/statefile/topics.yaml --skip-acls --no-delete --verbose plan

тут мы запускаем программу, говорим ей не анализировать ACL-листы (--skip-acls);
не пытаться удалить, а только изменить и добавить топики\ACl-листы (--no-delete);
выводить все промежуточные шаги в консоль (--verbose), только формировать план изменений (plan), так как мы не добавляли флаг -o, то план изменений будет только выведен в консоль и не будет сохранен в файл).

Конфиг (файл state.yaml)

Блок общих настроек:

settings:
  topics:
    defaults:
      replication: 1 # (ко всем топикам из конфига по умолчанию применять фактор репликации = 1, если не указано иного)
    blacklist:
      prefixed:
        - _confluent  # (игнорировать уже имеющиеся в Kafka топики, название которых начинается на _confluent)

Блок настроек топиков:

topics:
  msg:           #  (название топика)
    partitions: 3  #  (кол-во партиций)
    replication: 1 #  (фактор репликации)
    configs:
      cleanup.policy: compact   # (любые параметры настройки топиков)
      segment.bytes: 1000000  #  (любые параметры настройки топиков)
  test-msg:
    partitions: 2
    replication: 1
    configs:
      cleanup.policy: compact
      segment.bytes: 1000000

Переменные окружения:

Параметры подключения к кафке настраиваются через параметры окружения:

KAFKA_BOOTSTRAP_SERVERS=localhost:9092
KAFKA_SASL_JAAS_PASSWORD=kafka
KAFKA_SASL_JAAS_USERNAME=kafka
KAFKA_SECURITY_PROTOCOL=SASL_PLAINTEXT
KAFKA_CLIENT_ID=gitops-user
KAFKA_SASL_MECHANISM=PLAIN

Если на кластере Apache Kafka нет никакой авторизации, то достаточно будет прописать параметр KAFKA_BOOTSTRAP_SERVERS

AKHQ

Удобный и функциональный GUI менеджер для Apache Kafka - AKHQ.

Его ключевые особенности:

  • позволяет организовать мультикластерную конфигурацию из единого интерфейса;

  • поддержка протоколов и механизмов аутентификации и шифрования для Apache Kafka;

  • управление топиками, consumer группами, конфигурациями кластера, пользователями, ACL и т.д.;

  • интеграция с LDAP, RBAC, Schema Registry, Kafka Connect и многими другими продуктами и технологиями;

  • тонкая настройка прав доступа к web-интерфейсу системы.

Пример .yml конфиг файла для быстрого старта с использованием docker контейнера:

version: '3.6'
services:
  akhq:
    image: tchiotludo/akhq
    environment:
      AKHQ_CONFIGURATION: |
        akhq:
          connections:
            cluster-1:
              properties:
                bootstrap.servers: "server1:19092, server2:29092, server3:39092"
            cluster-2:
              properties:
                bootstrap.servers: " server1:19092, server2:29092, server3:39092"
           cluster-3:
              properties:
                bootstrap.servers: " server1:19092, server2:29092, server3:39092"
                security.protocol: SASL_PLAINTEXT
                sasl.mechanism: PLAIN
                sasl.jaas.config: org.apache.kafka.common.security.plain.PlainLoginModule required username="user" password="password";
    network_mode: host

Offset explorer

Offset Explorer (ранее Kafka Tool) — толстый клиент для управления и использования кластеров Apache Kafka с поддежкой мультикластерности. Он позволяет быстро просматривать объекты в кластере Kafka, а также сообщения, хранящиеся в топиках кластера. Offset Explorer содержит функционал, ориентированный как на разработчиков, так и на администраторов.

Некоторые из ключевых особенностей:

  • быстрый просмотр кластеров Kafka, включая их брокеров, топики и consumer’ов;

  • просмотр содержимого и добавление новых сообщений в партициях;

  • просмотр и выставление offset у consumer;

  • вывод сообщений в JSON, XML и Avro формате с возможностью их локального сохранения;

  • управление топиками, их конфигурациями и ACL листами;

  • поддержка протоколов и механизмов аутентификации и шифрования для Apache Kafka;

  • возможность написания своих собственных плагинов для вывода кастомных форматов данных.

Вывод

Apache Kafka позволяет пропускать через единую централизованную среду огромное количество сообщений, а затем хранить их, не беспокоясь о производительности и потере данных. Продукт собрал вокруг себя множество проектов, утилит, средств для комфортной работы, мониторинга, управления. В статье представлен наш опыт использования некоторых из них, которые показались нам интересными. Важную часть работы современных информационных систем составляет организация безопасности их работы и взаимодействия. В данной статье мы также рассмотрели ньюансы настройки безопасности кластеров Apache Kafka и Zookeeper, авторизации ACL, SASL аутентификации. Надеемся, наш опыт будет вам полезен.

Комментарии (1)


  1. makar_crypt
    14.04.2022 21:50
    -1

    мы обожаем кафку. любые статьи о кафке это плюс! Обязательно раскажите про 2й и 3й уровень кафки! У нас в binance кстати пол системы работает на этих уровнях stream, KSQL )