Буквально на секунду представьте, что у вас есть парочка здоровенных кластеров Apache Kafka, каждый из которых держит по нескольку миллионов rps. И тут вас попросили зеркалировать топик из одного кластера в другой. Максимально близко к реалтайму, да ещё и с некоторыми специфическими условиями. Если стало страшно, интересно или страшно интересно, то это статья для вас.

Под катом я расскажу, что такое зеркалирование и зачем оно нужно. Как нам перестало хватать Mirror Maker’а. Поговорим о возможных решениях и выборе между ними. И дам подробную инструкцию, как вам развернуть такое решение у себя.


Привет, меня зовут Дмитрий, я старший инженер инфраструктурных сервисов в Ozon из команды Message Bus. Ключевая инфраструктурная единица нашего отдела — это шина данных на базе Apache Kafka. Вместе с командой платформенных сервисов мы решаем глобальную задачу по предоставлению Kafka as a service. Это значит, что продуктовой команде не нужно задумываться о масштабировании и отказоустойчивости — для них это само собой разумеющиеся возможности шины данных. Пользователь может ничего не знать о брокерах и количестве реплик, тонких настройках для каждого топика и других особенностях работы кластера Kafka. Вместо этого ему предлагается просто заказывать нужные сущности, выбирая из каталога шаблонов на портале, писать в топики любое количество сообщений любого объёма и вычитывать их с нужной скоростью.

Мы продолжаем серию рассказов об инфраструктуре вокруг Kafka. В прошлый раз мой коллега поведал на Хабре о том, как мы внедряли Cruise Control. А сегодня поговорим о выборе инструмента для зеркалирования топиков.

Под «зеркалированием» мы понимаем копирование данных из одного топика в другой, при этом не переставая эксплуатировать исходный топик — писать и читать из него. Такое копирование может понадобится и для того, чтобы один топик оказался в двух разных продуктовых кластерах. И для того, чтобы данные какого-нибудь топика из продуктового кластера, конечно, без персональных данных и бизнес-критичной информации, оказались в тестовом контуре. Где команды могут протестировать новые версии своих сервисов без опасения что-либо сломать, но с настоящими данными.

Как мы зеркалировали трафик раньше и сейчас

С чего мы начинали?

Изначально при поступлении запроса на зеркалирование трафика мы создавали сервис на виртуальной машине Mirror Maker, в котором указывали необходимые настройки кластеров и топиков. Так как направления зеркалирования были между различными средами, работало несколько сервисов. Следующим очевидным шагом мы перенесли эти сервисы в контейнеры Kubernetes. Теперь для добавления или удаления топиков нам требовалось создавать новый деплоймент, что занимало время на прохождение всех этапов сборки и развёртывания.

Как мы работаем сейчас?

У нас имеется кластер Brooklin Mirror Maker, в котором настроены подключения ко всем необходимым кластерам Kafka. В любой момент можно посмотреть, какие задачи запущены или остановлены. Добавление, удаление, остановка новых задач зеркалирования занимает не более 5-10 минут даже для неподготовленного пользователя. 

Как возникла потребность в универсальном сервисе для зеркалирования топиков

Мы не начали эту историю с чистого листа — в компании успешно использовалось решение на основе Mirror Maker 1 для зеркалирования топиков. Обычно в тестовые контуры мы перенаправляли трафик из нескольких топиков для тестирования работы сервисов. При этом имена топиков совпадали, и проблем не возникало. Во время одного инцидента возникла потребность в «переливании» трафика между production-кластерами с изменением имени топика. Мы оказались не готовы в моменте решить эту задачу и нам пришлось идти более сложным путём. Тогда мы решили найти универсальное средство для решения таких задач в будущем.

Наши требования к системам зеркалирования: 

  • Топик может быть реплицирован как между кластерами, так и в пределах одного кластера.

  • Имя топика назначения может отличаться от имени топика источника, таким образом можно организовать переименование топика в пределах одного кластера, не останавливая запись.

  • Конфигурации исходного топика и топика назначения могут отличаться. Таким образом можно уменьшить количество партиций в пределах одного кластера. Тогда как Kafka обычно позволяет только увеличение количества партиций.

  • Добавление/удаление топиков на репликацию должно быть максимально просто, чтобы эту задачу можно было передавать инженерам линии поддержки.

  • Репликатор в конечном итоге не должен использовать Zookeeper целевых кластеров Kafka, но может использовать отдельный для собственных нужд. Сейчас мы используем версии Kafka, требующие Zookeeper, но планируем переход на KRaft в будущем.

  • Очень хотелось бы иметь возможность фильтрации сообщений при репликации. Но это оставим как опциональное требование.

Сравнительная таблица систем зеркалирования

Mirror Maker — решение на Java от Apache Kafka.

Ureplicator — решение на Java от компании Uber.

  • Ureplicator Non Federative функционально аналогичен Mirror Maker.

  • Ureplicator Federative позволяет создавать направления из групп Non Federative.

Brooklin — решение на Java от компании LinkedIn.

* Y — имена топиков совпадают в разных кластерах.
N — имя топика назначения должно быть другим в пределах одного кластера.
** Brooklin в оригинальном варианте не позволяет изменять имя топика назначения.

Исходя из сравнительной таблицы, ни одно решение не удовлетворяет требованиям, тем не менее в документации brooklin-rest-client есть ключ I - destination, что, возможно, позволит произвольно изменить название топика. Мы начали более внимательно смотреть на этот продукт.

Описание ключей brooklin-rest-client.sh

Console app to manage datastreams.

-c,--connector <CONNECTOR_NAME>            

Name of the connector

-d,--destination <DESTINATION_URI>           

Datastream destination uri

-dp,--destinationpartitions <DESTINATION_PARTITIONS>  

Number of partitions in the destination

-es,--eserde <ENVELOPE_SERDE>             

Name of the Serde to be used for Envelope. Config is optional.

-f,--force                       

force the entire datastream group to be paused/resumed

-h,--help                       

Display this message

-ks,--kserde <KEY_SERDE>                

Name of the Serde to be used for key. Config is optional.

-m,--metadata <DATASTREAM_METADATA>          

Datastream metadata key value pairs represented as json {"key1":"value1","key2":"value2"}

-n,--name <DATASTREAM_NAME>              

Name of the datastream

-nf,--noformat                     

Print without formatting

-o,--operation <DATASTREAM_OPERATION>         

Operation to perform accepted values [CREATE, READ, DELETE, READALL]

-p,--partitions <NUM_PARTITIONS>            

Number of partitions in the source

-ps,--pserde <PAYLOAD_SERDE>              

Name of the Serde to be used for payload. Config is optional.

-s,--source <SOURCE_URI>                

Datastream source uri

-sp,--sourcepartitions <SOURCE_PARTITIONS>       

the partitions which need to be moved

-t,--transport <TRANSPORT_NAME>            

Name of the Datastream Transport to use, default kafka.

-th,--targethost <TARGET_HOST>             

Name of the target host

-u,--uri <MANAGEMENT_URI>               

Management service

Изучаем Brooklin Mirror Maker

Стоит отметить, что Brooklin изначально создавался компанией LinkedIn для зеркалирования кластеров. Посмотреть результаты тестирования можно тут. 

Brooklin использует кластер Zookeeper для хранения статуса и управления задачами.

Основные свойства приложения:

  • масштабируемость добавлением node-кластера;

  • простое управление кластерами источниками/получателями; 

  • различные методы подключения к одному и тому же кластеру (PLAIN, SSL, SASL);

  • наличие API для управления задачами;

  • зеркалирование топиков с данными в любом направлении;

  • задача выполняется при недоступности части партиций;

  • возможность остановки/запуска задач или чтения партиций.

Возможности Brooklin

Brooklin — серверное приложение Java, разворачиваемое в кластере хостов, на каждом из которых может быть запущено несколько экземпляров приложения с одинаковым функционалом.

Термины

Datastream

  • Datastream - описание соединения между двумя системами, источник и получатель.

  • Brooklin позволяет создавать любое количество Datastream для организации различных подключений источника и получателя.

  • Для обеспечения масштабируемости Brooklin партиционирует данные в разрезе Datastream либо представляет их как поток с единственной партицией.

  • Каждый блок партиционированных данных разбивается на несколько DatastreamTasks с ограниченным количеством партиций для обработки в целях повышения увеличения полосы пропускания.

Connector

  • Connector — абстрактное представление модулей для чтения данных.

  • Существует несколько реализаций Connector для чтения различных источников.

  • Каждый Connector связан с AssignmentStrategy, которая определяет, как разбиваются задачи в разрезе Datastreams и как задачи распределяются между запущенными приложениями Brooklin.

TransportProvider

  • TransportProvider — абстрактное представление модулей для записи данных.

  • Существует несколько реализаций TransportProvider для записи данных в различные системы.

Coordinator

  • Brooklin Coordinator — модуль, отвечающий за управление различными Connector.

  • Объект Coordinator может быть только один в каждом запущенном приложении Brooklin.

  • В кластере Brooklin Coordinator допускается только один Leader, остальные являются ведомыми.

  • Brooklin использует Zookeeper для голосования в выборе Coordinator-лидера. 

  • Leader отвечает за распределение задач между Coordinators. 

  • Каждый Leader Coordinator связан с AssignmentStrategy для назначения задач Connector в разрезе Datastreams и DatastreamTasks.

  • Пример AssignmentStrategy — LoadbalancingStrategy-распределения DatastreamTasks между всеми Coordinators.

Архитектура

  • Серверное приложение Brooklin разворачивается на одном/нескольких серверах и использует ZooKeeper как единственное хранилище достоверной информации о Datastream  и метаданных DatastreamTask. 

  • Информация о запущенных приложениях Brooklin и назначенных DatastreamTask хранится в ZooKeeper.

  • Каждое приложение Brooklin предоставляет REST-интерфейс — Datastream Management Service (DMS).

  • Каждая сущность Brooklin открывает REST endpoint — aka Datastream Management Service (DMS) — который позволяет выполнить операции Create (создание), Read (чтение), Update (редактирование) и Delete (удаление) по HTTP-протоколу.

Процесс создания Datastream

Основные концепции создания Datastream представлены на рисунке.


  1. Клиент отправляет запрос на создание Datastream. 

  2. Запрос перенаправляется через REST к Datastream Management Service любого приложения Brooklin.

  3. Данные Datastream  проверяются и записываются в соответствующую znode ZooKeeper, за изменениями которых следит Leader Coordinator. 

  4. Leader Coordinator получает уведомление о создании новой Datastream znode.

  5. Leader Coordinator производит чтение метаданных нового Datastream и на основании AssignmentStrategy, ассоциированной с подходящим Connector, разбивает Datastream на одну или несколько DatastreamTasks. Назначение DatastreamTasks  происходит на основании записи в ZooKeeper доступным Coordinator.  

  6. Coordinators, получив уведомление о новом назначении DatastreamTask, в соответствующих znodes немедленно приступает к обработке, используя соответствующие Connectors. 

ZooKeeper

В дополнение к выбору Leader Coordinator приложения Brooklin актуализируют следующую информацию в ZooKeeper:

  • регистрация типов Connector;

  • метаинформация о Datastream и DatastreamTasks;

  • DatastreamTask-информация о состоянии (e.g. offsets/checkpoints, processing errors);

  • информация об активных нодах Brooklin;

  • DatastreamTask-назначения между нодами Brooklin.

Представление данных в Zookeeper

Структура

  • Каждый кластер Brooklin имеет одну znode верхнего уровня.

  • /dms – определения  Datastreams (JSON).

  • /connectors

    • sub-znode для каждого зарегистрированного типа Connector;

    • /connectors/<connector-type>/ определения DatastreamTasks, обрабатываемые этим типом Connector, одна znode на  DatastreamTask;

    • два подуровня  /connectors/<connector-type>/<datastreamtask> znode: config and state;

    • config хранит назначения  DatastreamTasks;

    • state хранит информацию о статусе Connectors (offsets/checkpoints).

  • /liveinstances

    • каждый экземпляр Brooklin создаёт инкрементальную эфемерную znodes для организации leader Coordinator; 

    • значение каждой sub-znode — это имя хоста соответствующего Brooklin-приложения.

  • /instances

    • каждое приложение Brooklin создаёт персистентную znode. Znode представляется в виде имени машины и уникального номера в соответствии с /liveinstances;

    • так же создаются две sub-znode — assignments and errors;

    • assignments всех DatastreamTasks назначенных приложению Brooklin;

    • errors-сообщения об ошибках каждого приложения.

Процесс работы

  • Leader Coordinator наблюдает за изменениями в the /dms znode — создание, удаление и изменение Datastreams. 

  • Каждый Coorindator наблюдает за изменениями соответствующей ему znode /instances для получения уведомления об изменениях в дочерних znode.

  • Leader Coordinator назначает DatastreamTasks приложениям Brooklin путём изменения в assignment znode-ветки /instances/<instance-name>. 

Метрики

Метрики и доступны через JMX exporter (JmxReporter) или файлы CSV (CsvReporter).

Итак, приступаем:

  1. Скачиваем скомпилированное приложение https://github.com/linkedin/brooklin/releases/download/4.1.0/brooklin-4.1.0.tgz

  2. Скачиваем исходники, они пригодятся на следующих этапах https://github.com/linkedin/brooklin/archive/refs/tags/4.1.0.zip

  3. Запускаем Zookeeper, Kafka, Brooklin по инструкции с сайта https://github.com/linkedin/brooklin/wiki/Mirroring-Kafka-Clusters

  4. Создаём Datastream без переименования топика

brooklin-rest-client.sh -o CREATE -u http://localhost:32311/ -n first-mirroring-stream -s "kafka://devenv:9094/T1"   -c kafkaMirroringConnector -t kafkaTransportProvider -m '{"owner":"test-user","system.reuseExistingDestination":"false"}' 

Успех

  {
      "Status": "READY",
      "metadata": {
        "owner": "test-user",
        "system.reuseExistingDestination": "false",
        "system.destination.KafkaBrokers": "devenv:9093",
        "system.creation.ms": "1677876640811",
        "group.id": "first-mirroring-stream",
        "system.taskPrefix": "first-mirroring-stream",
        "system.IsConnectorManagedDestination": "true",
        "datastreamUUID": "4ebd97c0-48a1-4979-bc21-50cf34618c81"
      },
      "transportProviderName": "kafkaTransportProvider",
      "destination": {
        "connectionString": "kafka://devenv:9093/*"
      },
      "name": "first-mirroring-stream",
      "connectorName": "kafkaMirroringConnector",
      "source": {
        "connectionString": "kafka://devenv:9094/T1"
      }
    } 
  1. Пробуем использовать ключи CLI

-d,–destination; 
-dp,–destination partitions 
 brooklin-rest-client.sh -o CREATE  -u http://localhost:32311/ -n first-mirroring-stream -s "kafka://devenv:9094/T1" -d "kafka://devenv:9093/T1" -dp 1  -c kafkaMirroringConnector -t kafkaTransportProvider -m '{"owner":"test-user","system.reuseExistingDestination":"false"}'

Ошибка! Читайте дальше, как обойти это ограничение.

cause=BYOT is not allowed for connector kafkaMirroringConnector. Datastream: {Status=INITIALIZING, metadata={owner=test-user, datastreamUUID=48e04aec-54c3-47fe-8414-b104f903579f, system.reuseExistingDestination=false, system.IsUserManagedDestination=true}, transportProviderName=kafkaTransportProvider, destination={partitions=1, connectionString=kafka://devenv:9093/T1}, name=first-mirroring-stream, connectorName=kafkaMirroringConnector, source={connectionString=kafka://devenv:9094/T1}} (com.linkedin.datastream.server.dms.DatastreamResources) 

BYOT is not allowed

BYOT - bring your own topic (Использование собственных имен топиков)

Отключаем проверку BYOT:

Где: datastream-kafka-connector/src/main/java/com/linkedin/datastream/connectors/kafka/mirrormaker/KafkaMirrorMakerConnector.java

Комментируем строки:

// verify that BYOT is not used
// if (DatastreamUtils.isUserManagedDestination(stream)) {
// throw new DatastreamValidationException(
//   String.format("BYOT is not allowed for connector %s. Datastream: %s", stream.getConnectorName(), stream));
//}

Компилируем приложение, подробнее – тут.

./gradlew clean build -x test && ./gradlew releaseTarGz

Запускаем приложение и пробуем:

brooklin-rest-client.sh -o CREATE  -u http://localhost:32311/ -n first-mirroring-stream -s "kafka://devenv:9094/T1" -d "kafka://devenv:9093/T2" -dp 1  -c kafkaMirroringConnector -t kafkaTransportProvider -m '{"owner":"test-user","system.reuseExistingDestination":"false"}' 

Всё хорошо:

    {
      "Status": "READY", - статус задачи
      "metadata": {
        "owner": "test-user", - владелец задачи
        "system.destination.KafkaBrokers": "devenv:9093", 
        "system.reuseExistingDestination": "false",
        "system.creation.ms": "1677886117163",
        "group.id": "first-mirroring-stream", - консьюмер группа
        "system.taskPrefix": "first-mirroring-stream",
        "system.IsConnectorManagedDestination": "true",
        "datastreamUUID": "70737cd3-cdc7-440e-9e8c-70851710feb7",
        "system.IsUserManagedDestination": "true"
      },
      "transportProviderName": "kafkaTransportProvider", - транспортный провайдер
      "destination": {
        "connectionString": "kafka://devenv:9093/T2" – plaintext, топик destination
      },
      "name": "first-mirroring-stream", - название datastrema
      "connectorName": "kafkaMirroringConnector", - название коннектора
      "source": {
        "connectionString": "kafka://devenv:9094/T1" – plaintext, топик source
      }
    } 

Вернёмся к постановке задачи требуется организовать зеркалирование трафика в любом направлении.

Пишем конфигурацию сервера применительно к Kafka:

brooklin.server.coordinator— параметры настройки серверной части (ноды);

brooklin.server.connectorNames — список настроенных подключений (consumers);

brooklin.server.transportProviderNames — список настроенных подключений (producers);

brooklin.server.connector — параметры настройки consumer;

brooklin.server.transportProvider — параметры настройки producer;

ssl.keystore.type — тип хранилища JKS; 

bootstrap.servers — указать доступные сервер(а) для подключений. При запуске приложения производится проверка их доступности.

Так как в основе проекта лежит Kafka Mirror, можно указывать привычные настройки.

При необходимости можно настраивать и другие методы подключения SASL Plaintext, SASL SSL. 

В приведённом примере у нас получится четыре варианта консьюмеров dev_plaintext,stg_plaintext,dev_ssl,stg_ssl и четыре конфигурации продюсеров dev_plaintext,stg_plaintext,dev_ssl,stg_ssl, которые мы можем комбинировать в любых вариациях подключения (consumer/producer) для любой задачи.

######################### Server Basics ##########################

brooklin.server.coordinator.cluster=brooklin

brooklin.server.coordinator.zkAddress=bmmzkp1:2181,bmmzkp2:2181,bmmzkp3:2181

brooklin.server.httpPort=32311

brooklin.server.connectorNames=dev_plaintext,stg_plaintext,dev_ssl,stg_ssl

brooklin.server.transportProviderNames=dev_plaintext,stg_plaintext,dev_ssl,stg_ssl

############################# dev_plaintext #############################

brooklin.server.connector.dev_plaintext.factoryClassName=com.linkedin.datastream.connectors.kafka.mirrormaker.KafkaMirrorMakerConnectorFactory

brooklin.server.connector.dev_plaintext.assignmentStrategyFactory=com.linkedin.datastream.server.assignment.BroadcastStrategyFactory

brooklin.server.transportProvider.dev_plaintext.factoryClassName=com.linkedin.datastream.kafka.KafkaTransportProviderAdminFactory

brooklin.server.transportProvider.dev_plaintext.bootstrap.servers=devkafkabroker1:9092

brooklin.server.transportProvider.dev_plaintext.client.id=dev_plaintext-producer

############################# stg_plaintext #############################

brooklin.server.connector.stg_plaintext.factoryClassName=com.linkedin.datastream.connectors.kafka.mirrormaker.KafkaMirrorMakerConnectorFactory

brooklin.server.connector.stg_plaintext.assignmentStrategyFactory=com.linkedin.datastream.server.assignment.BroadcastStrategyFactory

brooklin.server.transportProvider.stg_plaintext.factoryClassName=com.linkedin.datastream.kafka.KafkaTransportProviderAdminFactory

brooklin.server.transportProvider.stg_plaintext.bootstrap.servers=stgkafkabroker1:9092

brooklin.server.transportProvider.stg_plaintext.client.id=stg_plaintext-producer

############################# dev_ssl #############################

brooklin.server.connector.dev_ssl.factoryClassName=com.linkedin.datastream.connectors.kafka.mirrormaker.KafkaMirrorMakerConnectorFactory

brooklin.server.connector.dev_ssl.assignmentStrategyFactory=com.linkedin.datastream.server.assignment.BroadcastStrategyFactory

brooklin.server.connector.dev_ssl.consumer.security.protocol=ssl

brooklin.server.connector.dev_ssl.consumer.ssl.keystore.type=JKS

brooklin.server.connector.dev_ssl.consumer.ssl.truststore.type=JKS

brooklin.server.connector.dev_ssl.consumer.ssl.keystore.location=/opt/brooklin/config/dev_ssl.keystore.jks

brooklin.server.connector.dev_ssl.consumer.ssl.keystore.password=password

brooklin.server.connector.dev_ssl.consumer.ssl.truststore.location=/opt/brooklin/config/dev_ssl.keystore.jks

brooklin.server.connector.dev_ssl.consumer.ssl.truststore.password=password

brooklin.server.transportProvider.dev_ssl.factoryClassName=com.linkedin.datastream.kafka.KafkaTransportProviderAdminFactory

brooklin.server.transportProvider.dev_ssl.bootstrap.servers= devkafkabroker1:9093

brooklin.server.transportProvider.dev_ssl.client.id=dev_ssl-producer

brooklin.server.transportProvider.dev_ssl.security.protocol=ssl

brooklin.server.transportProvider.dev_ssl.ssl.keystore.type=JKS

brooklin.server.transportProvider.dev_ssl.ssl.truststore.type=JKS

brooklin.server.transportProvider.dev_ssl.ssl.keystore.location=/opt/brooklin/config/dev_ssl.keystore.jks

brooklin.server.transportProvider.dev_ssl.ssl.keystore.password=password

brooklin.server.transportProvider.dev_ssl.ssl.truststore.location=/opt/brooklin/config/dev_ssl.keystore.jks

brooklin.server.transportProvider.dev_ssl.ssl.truststore.password=password

############################# stg_ssl #############################

brooklin.server.connector.stg_ssl.factoryClassName=com.linkedin.datastream.connectors.kafka.mirrormaker.KafkaMirrorMakerConnectorFactory

brooklin.server.connector.stg_ssl.assignmentStrategyFactory=com.linkedin.datastream.server.assignment.BroadcastStrategyFactory

brooklin.server.connector.stg_ssl.consumer.security.protocol=ssl

brooklin.server.connector.stg_ssl.consumer.ssl.keystore.type=JKS

brooklin.server.connector.stg_ssl.consumer.ssl.truststore.type=JKS

brooklin.server.connector.stg_ssl.consumer.ssl.keystore.location=/opt/brooklin/config/stg_ssl.keystore.jks

brooklin.server.connector.stg_ssl.consumer.ssl.keystore.password=password

brooklin.server.connector.stg_ssl.consumer.ssl.truststore.location=/opt/brooklin/config/stg_ssl.keystore.jks

brooklin.server.connector.stg_ssl.consumer.ssl.truststore.password=password

#brooklin.server.connector.stg_ssl.consumer.ssl.key.password=

brooklin.server.transportProvider.stg_ssl.factoryClassName=com.linkedin.datastream.kafka.KafkaTransportProviderAdminFactory

brooklin.server.transportProvider.stg_ssl.bootstrap.servers=s stgkafkabroker1:9093

brooklin.server.transportProvider.stg_ssl.client.id=stg_ssl-producer

brooklin.server.transportProvider.stg_ssl.security.protocol=ssl

brooklin.server.transportProvider.stg_ssl.ssl.keystore.type=JKS

brooklin.server.transportProvider.stg_ssl.ssl.truststore.type=JKS

brooklin.server.transportProvider.stg_ssl.ssl.keystore.location=/opt/brooklin/config/stg_ssl.keystore.jks

brooklin.server.transportProvider.stg_ssl.ssl.keystore.password=password

brooklin.server.transportProvider.stg_ssl.ssl.truststore.location=/opt/brooklin/config/stg_ssl.keystore.jks

brooklin.server.transportProvider.stg_ssl.ssl.truststore.password=password

Пример создания Datastream с остановкой чтения партиций

Нам остаётся запустить приложение и проверить работоспособность.

# Просмотр datastreams 

curl -s http://brooklin:32311/datastream | jq

# Создание datastream

curl -X POST -H "Content-Type: application/json" -d @opendatacollecting http://brooklin:32311/datastream

cat <<EOF > opendatacollecting
{
 "metadata": {
  "owner": "test-user", - владелец задачи
  "group.id": "bmm_opendatacollecting ", - название консьюмер группы
  "system.reuseExistingDestination": "false",
  "system.IsConnectorManagedDestination": "true",
  "system.IsUserManagedDestination": "true",
  "system.destination.identityPartitioningEnabled": "false", - точное соответствие данных в партициях при зеркалировании
  "system.auto.offset.reset": "latest" – c какого места начинаем читать топик
 },
 "transportProviderName": "stg_ssl", - название транспортного провайдера
 "destination": {
     "connectionString": "kafkassl://stgkafkabroker1:9093,stgkafkabroker2:9093/opendatacollecting_product" –тип подключения (SSL),список брокеров, имя топика destination
 },
 "name": "TICKET-1374", - название datastream
 "connectorName": "dev_ssl", - название коннектора
 "source": {
     "connectionString": "kafkassl://devkafkabroker1:9093,devkafkabroker2:9093/opendatacollecting " – тип подключения (SSL),список брокеров, имя топика source
 }
}
EOF

# Пауза datastream

curl -X POST http://brooklin:32311/datastream/TICKET-1374?action=pause

# Запуск datastream

curl -X POST http://brooklin:32311/datastream/TICKET-1374?action=resume

# Удаление datastream

curl -X DELETE http://brooklin:32311/datastream/TICKET-1374 

# Пауза консьюма партиций (1-19) from 20 (остается консьюм 0 партиции) задача должна быть в статусе READY

curl -X POST -H "Content-Type: application/json" -d @stopopendatacollecting http:// brooklin:32311/datastream/TICKET-1374?action=pauseSourcePartitions
 
cat <<EOF > stopopendatacollecting
{
 "sourcePartitions" : {
  "opendatacollecting ": "1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16,17,18,19"
 }
}
EOF

# Запуск консьюма остановленных партиций

curl -X POST -H "Content-Type: application/json" -d  http:/brooklin:32311/datastream/TICKET-1374?action=resumeSourcePartitions

Пример графиков работы системы

Графики Rate консьюма по типу connector/Datastream

Графики Rate продюса по топикам

Пример файла настроек экспортера jmx_prometheus.yaml
rules:
 # Base pod metrics
 - pattern: 'java.lang<type=OperatingSystem><>(FreePhysicalMemorySize|TotalPhysicalMemorySize|SystemCpuLoad|ProcessCpuLoad|OpenFileDescriptorCount|AvailableProcessors|TotalStartedThreadCount)'
  name: java_lang_OperatingSystem_$1
  type: GAUGE

 # Consumer metrics by tasks
 - pattern: 'kafka.consumer<type=consumer-metrics, client-id=(.+)><>(connection-count|consumer-metadata-request-rate|failed-authentication-rate|incoming-byte-rate|network-io-rate|outgoing-byte-rate|request-rate|response-rate)'
  name: kafka_consumer_$2
  labels:
   clientid: "$1"

 # Brooklin metrics
 - pattern: metrics<name=Coordinator.isLeader><>Value
  name: brooklin_isleader
 - pattern: metrics<name=Coordinator.numRebalances><>FiveMinuteRate
  name: brooklin_numRebalances_FiveMinuteRate
 # devkafka.numDatastreamTasks
 - pattern: metrics<name=(\w+).numDatastreamTasks><>Value
  name: brooklin_numDatastreamTasks
  labels:
   connector: "$1"
 # Kafka Connector Metrics
 # devkafka.KafkaConnectorTask.dev-mirroring-2.numTopics
 - pattern: metrics<name=(\w+).KafkaConnectorTask.(.+).numTopics><>Value
  name: brooklin_KafkaConnectorTask_numTopics
  labels:
   connector: "$1"
   task: "$2"
 # devkafka.KafkaConnector.numTaskRestarts
 - pattern: metrics<name=(\w+).KafkaConnector.numTaskRestarts><>Value
  name: brooklin_KafkaConnector_numTaskRestarts
  labels:
   connector: "$1"
 - pattern: metrics<name=(\w+).KafkaConnectorTask.(.+).errorRate><>(FifteenMinuteRate)
  name: brooklin_KafkaConnectorTask_errorrate_FiveMinuteRate
  labels:
   connector: "$1"
   task: "$2"
 # devkafka.KafkaConnectorTask.aggregate.numPartitions
 # devkafka.KafkaConnectorTask.dev-mirroring-2.numPartitions
 - pattern: metrics<name=(\w+).KafkaConnectorTask.(.+).numPartitions><>Value
  name: brooklin_KafkaConnectorTask_numPartitions
  labels:
   connector: "$1"
   task: "$2"
 # devkafka.KafkaConnectorTask.aggregate.rebalanceRate Count
 # devkafka.KafkaConnectorTask.dev-mirroring-2.rebalanceRate
 - pattern: metrics<name=(\w+).KafkaConnectorTask.(.+).rebalanceRate><>Count
  name: brooklin_KafkaConnectorTask_rebalanceRate
  labels:
   connector: "$1"
   task: "$2"
 # devkafka.KafkaConnectorTask.aggregate.stuckPartitions
 - pattern: metrics<name=(\w+).KafkaConnectorTask.(.+).stuckPartitions><>Value
  name: brooklin_KafkaConnectorTask_stuckPartitions
  labels:
   connector: "$1"
   task: "$2"
 # devkafka.KafkaConnectorTask.dev-mirroring-2.eventsByteProcessedRate
 - pattern: metrics<name=(\w+).KafkaConnectorTask.(.+).eventsByteProcessedRate><>Count
  name: brooklin_KafkaConnectorTask_eventsByteProcessedRate
  labels:
   connector: "$1"
   task: "$2"
 # devkafka.KafkaConnectorTask.dev-mirroring-2.numPolls
 - pattern: metrics<name=(\w+).KafkaConnectorTask.(.+).numPolls><>FiveMinuteRate
  name: brooklin_KafkaConnectorTask_numPolls_FiveMinuteRate
  labels:
   connector: "$1"
   task: "$2"
 # Kafka Mirror Maker Connector metrics
 - pattern: metrics<name=(\w+).KafkaMirrorMakerConnectorTask.(.+).numTopics><>Value
  name: brooklin_KafkaMirrorMakerConnectorTask_numTopics
  labels:
   connector: "$1"
   task: "$2"
 # metrics:name=devkafka.KafkaMirrorMakerConnector.numTaskRestarts
 - pattern: metrics<name=(\w+).KafkaMirrorMakerConnector.numTaskRestarts><>Value
  name: brooklin_KafkaMirrorMakerConnector_numTaskRestarts
  labels:
   connector: "$1"
 - pattern: metrics<name=(\w+).KafkaMirrorMakerConnectorTask.(.+).errorRate><>(FifteenMinuteRate)
  name: brooklin_KafkaMirrorMakerConnectorTask_errorrate_FiveMinuteRate
  labels:
   connector: "$1"
   task: "$2"
 - pattern: metrics<name=(\w+).KafkaMirrorMakerConnectorTask.(.+).numPartitions><>Value
  name: brooklin_KafkaMirrorMakerConnectorTask_numPartitions
  labels:
   connector: "$1"
   task: "$2"
 - pattern: metrics<name=(\w+).KafkaMirrorMakerConnectorTask.(.+).rebalanceRate><>Count
  name: brooklin_KafkaMirrorMakerConnectorTask_rebalanceRate
  labels:
   connector: "$1"
   task: "$2"
 - pattern: metrics<name=(\w+).KafkaMirrorMakerConnectorTask.(.+).stuckPartitions><>Value
  name: brooklin_KafkaMirrorMakerConnectorTask_stuckPartitions
  labels:
   connector: "$1"
   task: "$2"
 - pattern: metrics<name=(\w+).KafkaMirrorMakerConnectorTask.(.+).eventsByteProcessedRate><>Count
  name: brooklin_KafkaMirrorMakerConnectorTask_eventsByteProcessedRate
  labels:
   connector: "$1"
   task: "$2"
 - pattern: metrics<name=(\w+).KafkaMirrorMakerConnectorTask.(.+).numPolls><>FiveMinuteRate
  name: brooklin_KafkaMirrorMakerConnectorTask_numPolls_FiveMinuteRate
  labels:
   connector: "$1"
   task: "$2"
 # Kafka Mirror Maker Connector metrics
 - pattern: metrics<name=(\w+).KafkaMirrorMakerConnectorTask.(.+).eventsProcessedRate><>Count
  name: brooklin_KafkaMirrorMakerConnectorTask_eventsProcessedRate
  labels:
   connector: "$1"
   task: "$2"
 # Producer metrics
 - pattern: metrics<name=EventProducer.(.+).eventProduceRate><>Count
  name: brooklin_EventProducer_eventProduceRate
  labels:
   connector: "$1"
 - pattern: metrics<name=EventProducer.(.+).totalEventsProduced><>Count
  name: brooklin_EventProducer_totalEventsProduced
  labels:
   connector: "$1"
 # Topic metrics
 - pattern: metrics<name=KafkaTransportProvider.(.+).eventByteWriteRate><>Count
  name: brooklin_KafkaTransportProvider_eventByteWriteRate
  labels:
   topic: "$1"
 - pattern: metrics<name=KafkaTransportProvider.(.+).eventWriteRate><>Count
  name: brooklin_KafkaTransportProvider_eventWriteRate
  labels:
   topic: "$1"

Какие были проблемы на этапе запуска

Основной сложностью при сборке приложения без доступа к интернету является разрешение зависимостей, которые вам потребуется разместить в локальном Artifactory. Один из коротких путей получения требуемого списка — собрать приложение с доступом к интернету, перенаправив запросы через прокси-сервер или Nginx. 

Файлы, в которых потребуется изменить пути репозиториев:

gradle/buildscript.gradle — указать локальный Artifactory;

gradle/wrapper/gradle-wrapper.properties — указать локальный Artifactory;

gradle/maven.gradle — отключить Artifactory publishing.

Выводы

Внедрение Brooklin значительно упростило администрирование задач зеркалирования трафика. Основное преимущество — возможность оперативно направлять данные в любой кластер с изменением имени топика, при необходимости можно уменьшить количество партиций чтения, возможность паузы и возобновления задач.

Итоговая сводная таблица:

* Y — имена топиков совпадают в разных кластерах.
N — имя топика назначения должно быть другим в пределах одного кластера.
** Brooklin в оригинальном варианте не позволяет изменять имя топика назначения.
Y/Y - имя топика назначения должно быть другим в пределах одного кластера.

Планы на будущее

Разработка веб-интерфейса для взаимодействия через API позволит визуализировать и автоматизировать постановку, удаление и учёт задач зеркалирования трафика. Требования к функционалу уже подготовлены.

Ограничение доступа к API планируется организовать посредством Keycloak Gatekeeper для аутентификации запросов пользователей и сервисов в Keycloak.

Комментарии (4)


  1. Legoti4
    27.09.2023 10:14
    +1

    А чем не подошел kafka-connect с подключенным к нему MM и дополнительными трансформами?


    1. mdaff Автор
      27.09.2023 10:14
      +1

      Привет, мы использовали MM на первых этапах, в соответствии с постановкой задачи мы выбрали централизованное решение для выполнения всех задач в любой комбинации кластеров (подключений к ним) с возможностью горизонтального масштабирования, минимальными затратами на добавление задач зеркалирования или конфигурирования, обеспеченив отказоустойчивость.


  1. Legoti4
    27.09.2023 10:14

    А чем не подошел kafka-connect с подключенным к нему MM и дополнительными трансформами?


  1. alecx
    27.09.2023 10:14

    Т.е. Вы наивно выключили проверку, не разобравшись зачем тут она была, не задали вопрос разработчикам в открытом проекте на Github, пошли с этим в HiLoad продакшн и надеетесь что все будет хорошо?

    То ли я старею, то ли этот мир несется куда то не туда...