Strimzi — это практически самый широкий оператор Kubernetes Kafka, который можно использовать для развертывания Apache Kafka, либо других его компонентов, таких как Kafka Connect, Kafka Mirror и т.д. В статье мы пошагово разберем развертывание Kafka Connect в Kubernetes. А еще затронем проблемы, с которыми можно столкнуться во время процедуры развертывания и приведем способы их решения.

Примечание: Учтите, что Strimzi основан на Apache Kafka, а не на платформе Confluent. Вот почему вам, скорее всего, потребуется добавить некоторые артефакты Confluent, например, Confluent Avro Converter, чтобы получить отдачу от этого.

Статья основана на Strimzi v0.29.0. Это значит, что вы можете установить следующие версии Kafka Connect:

  • Стримзи: 0.29.0

  • Apache Kafka и Kafka Connect: до 3.2

  • Эквивалентная платформа слияния: 7.2.4

Примечание: вы можете преобразовать версию платформы Confluent в версию Apache Kafka и наоборот с помощью приведенной здесь таблицы.

Установка

Графический интерфейс Openshift и CLI Kubernetes

Если вы используете Openshift, перейдите в раздел Операторы > установленные операторы > Strimzi > Kafka Connect.

Шаг за шагом: Теперь вы столкнетесь с формой, содержащей конфигурации Kafka connect. Чтобы получить эквивалентный Yaml‑файл формы, нажмите на Yaml View. Любое обновление представления формы применяется к представлению Yaml «на лету». Только не используйте его для непосредственного создания экземпляра. Он нужен для преобразования желаемой конфигурации в файл Yaml. После получения Yaml‑файл разверните оператор с помощью команды kubectl apply. Итак, подведем итог:

  • Введите конфигурацию в представлении формы.

  • Нажмите на просмотр Yaml.

  • Скопируйте его содержимое в файл Yaml на вашем локальном компьютере (например, kafka-connect.yaml).

  • Выполнить: kubectl apply -f kafka-connect.yaml .

Вид Kafka-Connect либо развертывается, либо обновляется. Развернутые ресурсы состоят из развертывания и модулей, сервиса, конфигурационных карт и секретов.

Давайте пройдемся по минимальной конфигурации и шаг за шагом сделаем ее более продвинутой.

Минимальная конфигурация

Чтобы развернуть простую минимальную конфигурацию Kafka Connect, вы можете использовать приведенный ниже Yaml:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: my-connect-cluster
  namespace: <YOUR_PROJECT_NAME>
spec:
  config:
    config.storage.replication.factor: -1
    config.storage.topic: okd4-connect-cluster-configs
    group.id: okd4-connect-cluster
    offset.storage.replication.factor: -1
    offset.storage.topic: okd4-connect-cluster-offsets
    status.storage.replication.factor: -1
    status.storage.topic: okd4-connect-cluster-status
  bootstrapServers: kafka1, kafka2
  version: 3.2.0
  replicas: 1

Также для этой цели подойдет Rest API Kafka Connect на порту 8083, открытом в модуле. Предоставьте его в частной или внутренней сети, определив маршрут в OLD.

Аутентификация REST API

С помощью конфигурации, описанной здесь, добавьте аутентификацию к REST-прокси Kafka Connect. Единственное — это не сработает с оператором Strimzi. Поэтому для обеспечения безопасности в Kafka Connect, у вас есть два варианта:

  • Используйте API-оператор Kafka Connector. Оператор Strimzi поможет определить тип соединителя в файле YAML. Однако в некоторых случаях это может быть непрактично, ведь необходимо обновлять, приостанавливать и останавливать соединители через REST API.

  • Поместите небезопасный REST API за аутентифицированный API-шлюз, такой, как Apache APISIX. Подойти может любой другой инструмент или приложение собственной разработки.

Показатели JMX Prometheus

Чтобы предоставить метрики JMX Prometheus, полезные для наблюдения за статусами соединителей в Grafana, добавьте приведенную ниже конфигурацию:

metricsConfig:
    type: jmxPrometheusExporter
    valueFrom:
      configMapKeyRef:
        key: jmx-prometheus
        name: configs
  jmxOptions: {}

Он использует предварительно определенную конфигурацию для экспорта Prometheus. Вы можете использовать эту конфигурацию:

startDelaySeconds: 0
ssl: false
lowercaseOutputName: false
lowercaseOutputLabelNames: false
rules:
- pattern : "kafka.connect<type=connect-worker-metrics>([^:]+):"
  name: "kafka_connect_connect_worker_metrics_$1"
- pattern : "kafka.connect<type=connect-metrics, client-id=([^:]+)><>([^:]+)"
  name: "kafka_connect_connect_metrics_$2"
  labels:
    client: "$1"
- pattern: "debezium.([^:]+)<type=connector-metrics, context=([^,]+), server=([^,]+), key=([^>]+)><>RowsScanned"
  name: "debezium_metrics_RowsScanned"
  labels:
    plugin: "$1"
    name: "$3"
    context: "$2"
    table: "$4"
- pattern: "debezium.([^:]+)<type=connector-metrics, context=([^,]+), server=([^>]+)>([^:]+)"
  name: "debezium_metrics_$4"
  labels:
    plugin: "$1"
    name: "$3"
    context: "$2"

Сервис для внешнего Prometheus

Если вы собираетесь развернуть Prometheus в сочетании со Strimzi для сбора показателей, следуйте инструкциям. Но помните, что в случае использования внешнего Prometheus история разворачивается по-другому.

Оператор Strimzi создает отображение портов в сервисе только для этих портов:

  • 8083: Kafka Connect REST API

  • 9999: порт JMX

К сожалению, это не создает сопоставление для порта 9404, HTTP-порта экспортера Prometheus. Итак, мы должны создать сервис самостоятельно:

kind: Service
apiVersion: v1
metadata:
  name: kafka-connect-jmx-prometheus
  namespace: kafka-connect
  labels:
    app.kubernetes.io/instance: kafka-connect
    app.kubernetes.io/managed-by: strimzi-cluster-operator
    app.kubernetes.io/name: kafka-connect
    app.kubernetes.io/part-of: strimzi-kafka-connect
    strimzi.io/cluster: kafka-connect
    strimzi.io/kind: KafkaConnect
spec:
  ports:
    - name: tcp-prometheus
      protocol: TCP
      port: 9404
      targetPort: 9404
  type: ClusterIP
  selector:
    strimzi.io/cluster: kafka-connect
    strimzi.io/kind: KafkaConnect
    strimzi.io/name: kafka-connect-connect
status:
  loadBalancer: {}

Примечание: этот метод работает только для развертываний с одним модулем. Вы должны определить маршрут для сервиса, и даже в случае headless сервиса маршрут возвращает один IP-модуль за раз. Следовательно, Prometheus не может очистить все показатели pods. Вот почему рекомендуется использовать Podmonitor и Prometheus в облаке.

Плагины и артефакты

Чтобы добавить плагины и артефакты, есть два способа.

Секция сборки оператора

Чтобы добавить плагины, воспользуйтесь разделом сборки оператора. Он получает адреса плагинов или артефактов, загружает их на этапе сборки, так как оператор автоматически создает конфигурацию сборки, и добавляет их в каталог плагинов изображения.

Он поддерживает jar, tgz, zip, and maven. Но в случае Maven создается многоступенчатый файл Dockerfile, который проблематичен для Openshift, и он сталкивается со сбоем на стадии сборки. Следовательно, вам следует использовать только другие типы, которым не нужна стадия компиляции, например, jar, zip, tgz, и в итоге вы получите одноступенчатый файл Dockerfile.

Например, чтобы добавить плагин Debezium MySQL, вы можете использовать приведенную ниже конфигурацию:

spec:  
  build:
    output:
      image: 'kafkaconnect:1.0'
      type: imagestream
    plugins:
      - artifacts:
          - type: tgz
            url: >-
              https://repo1.maven.org/maven2/io/debezium/debezium-connector-mysql/2.1.4.Final/debezium-connector-mysql-2.1.4.Final-plugin.tar.gz
        name: debezium-connector-mysql

Важно: оператор Strimzi может загружать только общедоступные артефакты. Поэтому, если вы хотите загрузить конфиденциально защищенный артефакт, недоступный Kubernetes, вам следует отказаться от этого метода и перейти к следующему.

Изменение изображений

Оператор может использовать желаемое вами изображение вместо изображения по умолчанию. Так вы можете добавить выбранные артефакты и плагины, создав образ вручную или с помощью CI/CD. Вы, возможно, захотите использовать именно этот способ, ведь Strimzi использует Apache Kafka image, а не платформу Confluent. То есть, в развертываниях нет совместимых пакетов, таких как Confluent Avro Converter и т.д. Поэтому вам нужно добавить их в свой образ и настроить оператора на использование вашего образа docker.

Например, если вы хотите добавить свой настроенный плагин Debezium MySQL Connector из универсальных пакетов Gitlab и Confluent Avro Converter в базовый образ, сначала используйте этот файл Dockerfile:

ARG CONFLUENT_VERSION=7.2.4

# Install confluent avro converter
FROM confluentinc/cp-kafka-connect:${CONFLUENT_VERSION} as cp
# Reassign version
ARG CONFLUENT_VERSION
RUN confluent-hub install --no-prompt confluentinc/kafka-connect-avro-converter:${CONFLUENT_VERSION}

# Copy privious artifacts to the main strimzi kafka image
FROM quay.io/strimzi/kafka:0.29.0-kafka-3.2.0
ARG GITLAB_TOKEN
ARG CI_API_V4_URL=https://gitlab.snapp.ir/api/v4
ARG CI_PROJECT_ID=3873
ARG DEBEZIUM_CONNECTOR_MYSQL_CUSTOMIZED_VERSION=1.0
USER root:root

# Copy Confluent packages from previous stage
RUN mkdir -p /opt/kafka/plugins/avro/
COPY --from=cp /usr/share/confluent-hub-components/confluentinc-kafka-connect-avro-converter/lib /opt/kafka/plugins/avro/

# Connector plugin debezium-connector-mysql
RUN 'mkdir' '-p' '/opt/kafka/plugins/debezium-connector-mysql' \
    && curl --header "${GITLAB_TOKEN}" -f -L \
    --output /opt/kafka/plugins/debezium-connector-mysql.tgz \
    ${CI_API_V4_URL}/projects/${CI_PROJECT_ID}/packages/generic/debezium-customized/${DEBEZIUM_CONNECTOR_MYSQL_CUSTOMIZED_VERSION}/debezium-connector-mysql-customized.tar.gz \
    && 'tar' 'xvfz' '/opt/kafka/plugins/debezium-connector-mysql.tgz' '-C' '/opt/kafka/plugins/debezium-connector-mysql' \
    && 'rm' '-vf' '/opt/kafka/plugins/debezium-connector-mysql.tgz'

USER 1001

Создайте образ. Отправьте его в поток изображений или любой другой репозиторий docker и настройте оператора, добавив строку ниже:

spec:  
  image: image-registry.openshift-image-registry.svc:5000/kafka-connect/kafkaconnect-customized:1.0

В зависимости от его типа используйте различные конфигурации для добавления проверки подлинности Kafka. Здесь вы можете увидеть конфигурацию для Kafka с механизмом SASL/Plaintext и scram-sha-512:

spec:
  authentication:
    passwordSecret:
      password: kafka-password
      secretName: mysecrets
    type: scram-sha-512
    username: myuser

Далее вам нужно указать пароль в секретном файле с именем my secret.

Обработка учетных данных файла

Поскольку соединителям нужны учетные данные для доступа к базам данных, вы должны определить их как секреты и получить к ним доступ с помощью переменных среды. Однако, если их слишком много, вы можете поместить все учетные данные в файл и адресовать их в соединителе с помощью модификатора $file modifier.

  1. Поместите все учетные данные в качестве значения ключа с именем credentials в секретный файл.

Файл учетных данных:

USERNAME_DB_1=user1
PASSWORD_DB_1=pass1

USERNAME_DB_2=user2
PASSWORD_DB_2=pass2

Secret file:

kind: Secret
apiVersion: v1
metadata:
  name: mysecrets
  namespace: kafka-connect
data:
  credentials: <BASE64 YOUR DATA>
  1. Настройте оператора с секретом в качестве тома:

spec:
  config:
    config.providers: file
    config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider  
  externalConfiguration:
    volumes:
      - name: database_credentials
        secret:
          items:
            - key: credentials
              path: credentials
          optional: false
          secretName: mysecrets
  1. Теперь в соединителе вы можете получить доступ к PASSWORD_DB_1 с помощью приведенной ниже команды:

"${file:/opt/kafka/external-configuration/database_credentials/credentials:PASSWORD_DB_1}"

Собрать все это воедино

Если мы соберем все конфигурации вместе, у нас будет приведенная ниже конфигурация для Kafka Connect:

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnect
metadata:
  name: kafka-connect
  namespace: kafka-connect
spec:
  authentication:
    passwordSecret:
      password: kafka-password
      secretName: mysecrets
    type: scram-sha-512
    username: myuser
  config:
    config.providers: file
    config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider
    config.storage.replication.factor: -1
    config.storage.topic: okd4-connect-cluster-configs
    group.id: okd4-connect-cluster
    offset.storage.replication.factor: -1
    offset.storage.topic: okd4-connect-cluster-offsets
    status.storage.replication.factor: -1
    status.storage.topic: okd4-connect-cluster-status
  bootstrapServers: 'kafka1:9092, kafka2:9092'
  metricsConfig:
    type: jmxPrometheusExporter
    valueFrom:
      configMapKeyRef:
        key: jmx-prometheus
        name: configs
  resources:
    limits:
      memory: 1Gi
    requests:
      memory: 1Gi
  readinessProbe:
    failureThreshold: 10
    initialDelaySeconds: 60
    periodSeconds: 20
  jmxOptions: {}
  livenessProbe:
    failureThreshold: 10
    initialDelaySeconds: 60
    periodSeconds: 20
  image: image-registry.openshift-image-registry.svc:5000/kafka-connect/kafkaconnect-customized:1.0
  version: 3.2.0
  replicas: 2
  externalConfiguration:
    volumes:
      - name: database_credentials
        secret:
          items:
            - key: credentials
              path: credentials
          optional: false
          secretName: mysecrets

Примечание: сервис, маршрут и конфигурация сборки не указаны, это есть в статье выше.

Делаем выводы

Развертывание Kafka Connect с использованием оператора Strimzi может стать мощным и эффективным способом управления интеграцией данных в вашей организации. Используя гибкость и масштабируемость Kafka, а также простоту использования и автоматизацию, предоставляемые оператором Strimzi, вы можете оптимизировать свои каналы передачи данных и улучшить процесс принятия решений на основе данных.

https://slurm.club/3Hx0IAi

Чтобы разобраться, как развертывать Kafka и использовать этот инструмент в работе, приглашаем вас на курс Apache Kafka для разработчиков. Обучение стартует 12 мая 2023. На нем разберем:

  • неправильное использование Кафка и отсутствие коммитов в ней;

  • ваши кейсы о проблемах при работе с Apache Kafka;

  • опыт создания Data Lake на ~80 ТБ с помощью Apache Kafka;

  • особенности эксплуатации kafka с retention в 99999999.

В этой статье мы рассмотрели ключевые шаги, связанные с развертыванием Kafka Connect с помощью оператора Strimzi, включая создание минимального пользовательского определения ресурсов (CRD), проблему базовой аутентификации REST API, аутентификацию Kafka, показатели JMX Prometheus, плагины и артефакты, а также обработку учетных данных файла. Выполнив эти действия, вы можете легко настроить развертывание Kafka Connect в соответствии с вашими конкретными потребностями.

Комментарии (2)