Kafka Connect – это интеграционный фреймворк, который является частью проекта Apache Kafka. На платформах Kubernetes и Red Hat OpenShift его можно развернуть с помощью операторов Strimzi и Red Hat AMQ Streams. В Kafka Connect есть два вида коннекторов: source и sink. Первые предназначены для загрузки данных в Kafka из внешних систем, вторые – для выгрузки данных из Kafka. Подключение к внешним системам, как правило, требует аутентификации, поэтому при настройке коннектора надо указывать учетные данные. В этом посте мы покажем как пользоваться секретами Kubernetes для хранения учетных данных при настройке коннекторов.



Здесь мы будем использовать source-коннектор S3, который входит в состав Apache Camel Kafka (подробнее см. здесь), и на его примере покажем, как сконфигурировать коннектор, чтобы он использовал секрет.

Описанная здесь процедура настройки универсальна и подходит для коннекторов любых типов. Мы будем применять коннектор S3 для подключения к хранилищу Amazon AWS S3 и загрузки файлов из корзины (bucket) S3 в топик (topic) Apache Kafka. Для подключения к хранилищу S3 нам потребуются следующие учетные данные AWS: ключ доступа (access key) и секретный ключ (secret key). Итак, начнем с того, что подготовим секрет с учетными данными.

Создаем секрет с учетными данными


Первым делом создаем простейший файл свойств aws-credentials.properties и прописываем в него учетные данные, вот так:

aws_access_key_id=AKIAIOSFODNN7EXAMPLE
aws_secret_access_key=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY

Указанные здесь учетные данные должны обеспечивать доступ к корзине S3, откуда будут читаться данные. Теперь из этого файла нужно сгенерировать секрет, это делается следующей командой:

$ kubectl create secret generic aws-credentials --from-file=./aws-credentials.properties

Собираем новый контейнерный образ с коннектором


Далее необходимо подготовить новый Docker-образ с нашим коннектором. Если используется Strimzi, то Dockerfile для добавления нашего коннектора выглядит вот так:

FROM strimzi/kafka:0.16.1-kafka-2.4.0
USER root:root
COPY ./my-plugins/ /opt/kafka/plugins/
USER 1001

Если применяется Red Hat AMQ Streams, то Dockerfile выглядит вот так:

FROM registry.redhat.io/amq7/amq-streams-kafka-23:1.3.0
USER root:root
COPY ./my-plugins/ /opt/kafka/plugins/
USER jboss:jboss

Затем, используя Dockerfile, надо выполнить сборку контейнерного образа, содержащего необходимые нам коннекторы, и разместить его в реестре. Если у вас нет своего частного реестра, то можно использовать публичные реестры, например Quay или Docker Hub.

Разворачиваем Apache Kafka Connect


Итак, имея контейнерный образ, мы можем развернуть Apache Kafka Connect путем создания следующего ресурса в Kubernetes:

apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnect
metadata:
  name: my-connect-cluster
spec:
  image: docker.io/scholzj/kafka:camel-kafka-2.4.0
  replicas: 3
  bootstrapServers: my-cluster-kafka-bootstrap:9092
  externalConfiguration:
    volumes:
      - name: aws-credentials
        secret:
          secretName: aws-credentials
  config:
    config.providers: file
    config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider
    key.converter: org.apache.kafka.connect.json.JsonConverter
    value.converter: org.apache.kafka.connect.json.JsonConverter
    key.converter.schemas.enable: false
    value.converter.schemas.enable: false

На что надо обратить внимание в этом описании? Во-первых, на поле image, которое говорит оператору развертывания Apache Kafka Connect, какой образ надо при этом использовать – то есть тот образ, куда мы включили нужные нам коннекторы. В нашем примере собранный на предыдущем шаге контейнерный образ размещен в реестре Docker Hub по адресу scholzj/kafka:camel-kafka-2.4.0, поэтому у нас поле image выглядит следующим образом:

image: docker.io/scholzj/kafka:camel-kafka-2.4.0

Во-вторых, секция externalConfiguration:

externalConfiguration:
  volumes:
    - name: aws-credentials
      secret:
        secretName: aws-credentials

Здесь мы говорим оператору монтировать Kubernetes-секрет aws-credentials (мы создали его выше) в pod’ы Apache Kafka Connect. Перечисленные здесь секреты будут монтироваться с путем /opt/kafka/external-configuration/, где – это имя секрета.

И наконец, обратите внимание на секцию config, где мы говорим, что Apache Kafka Connect использует FileConfigProvider в качестве провайдера конфигурации:

config:
  config.providers: file
  config.providers.file.class: org.apache.kafka.common.config.provider.FileConfigProvider

Провайдер конфигурации – это такой способ не прописывать параметры непосредственно в конфигурации, а брать их из другого источника. В нашем случае мы создаем провайдера конфигурации с именем file, который будет использовать класс FileConfigProvider. Этот провайдер входит в состав Apache Kafka. FileConfigProvider может читать файлы свойств и извлекать из них значения, и мы будем использовать его для загрузки API-ключей для нашей учетной записи Amazon AWS.

Создаем коннектор, используя Apache Kafka Connect REST API


Обычно надо подождать минуту-другую, пока выполнится развертывание Apache Kafka Connect, прежде чем создавать экземпляр коннектора. В предыдущих версиях Strimzi и Red Hat AMQ Streams для этого надо было использовать REST API. Теперь же мы можем создать коннектор, выполнив POST для следующего JSON’а:

{
 "name": "s3-connector",
 "config": {
   "connector.class": "org.apache.camel.kafkaconnector.CamelSourceConnector",
   "tasks.max": "1",
   "camel.source.kafka.topic": "s3-topic",
   "camel.source.maxPollDuration": "10000",
   "camel.source.url": "aws-s3://camel-connector-test?autocloseBody=false",
   "key.converter": "org.apache.kafka.connect.storage.StringConverter",
   "value.converter": "org.apache.camel.kafkaconnector.converters.S3ObjectConverter",
   "camel.component.aws-s3.configuration.access-key": "${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_access_key_id}",
   "camel.component.aws-s3.configuration.secret-key": "${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_secret_access_key}",
   "camel.component.aws-s3.configuration.region": "US_EAST_1"
   }
}

Конфигурация коннектора содержит API-ключи AWS в полях camel.component.aws-s3.configuration.access-key и camel.component.aws-s3.configuration.secret-key. Вместо того, чтобы прописывать значения напрямую, мы просто ссылаемся на провайдер конфигурации file, чтобы загрузить поля aws_access_key_id и aws_secret_access_key из нашего файла aws-credentials.properties.

Обратите внимание, как именно мы ссылаемся на провайдера конфигурации: указываем путь к файлу, который надо использовать, и через двоеточие – имя ключа, который надо извлечь, вот так:

"camel.component.aws-s3.configuration.access-key": "${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_access_key_id}"

И вот так:

"camel.component.aws-s3.configuration.secret-key": "${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_secret_access_key}"

Выполнить POST этих результатов в Apache Kafka Connect REST API можно, например, с помощью команды curl

$ curl -X POST -H "Content-Type: application/json" -d connector-config.json http://my-connect-cluster-connect-api:8083/connectors

Одно из преимуществ провайдеров конфигурации заключается в том, что они позволят не «светить» те параметры конфигурации, которые нужно держать в секрете:

$ curl http://my-connect-cluster-connect-api:8083/connectors/s3-connector
{
  "name": "s3-connector",
  "config": {
    "connector.class": "org.apache.camel.kafkaconnector.CamelSourceConnector",
    "camel.source.maxPollDuration": "10000",
    "camel.source.url": "aws-s3://camel-connector-test?autocloseBody=false",
    "camel.component.aws-s3.configuration.region": "US_EAST_1",
    "camel.component.aws-s3.configuration.secret-key": "${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_secret_access_key}",
    "tasks.max": "1",
    "name": "s3-connector",
    "value.converter": "org.apache.camel.kafkaconnector.converters.S3ObjectConverter",
    "camel.component.aws-s3.configuration.access-key": "${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_access_key_id}",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "camel.source.kafka.topic": "s3-topic"
  },
  "tasks": [
    {
      "connector": "s3-connector",
      "task": 0
    }
  ],
  "type": "source"
}

Создаем коннектор при помощи оператора Strimzi


Начиная с версии 0.16.0 в Strimzi также появился новый оператор для создания коннекторов с использованием следующего пользовательского YAML ресурса(в KafkaConnector можно напрямую использовать и провайдер конфигурации):

apiVersion: kafka.strimzi.io/v1alpha1
kind: KafkaConnector
metadata:
  name: s3-connector
  labels:
    strimzi.io/cluster: my-connect-cluster
spec:
  class: org.apache.camel.kafkaconnector.CamelSourceConnector
  tasksMax: 1
  config:
    key.converter: org.apache.kafka.connect.storage.StringConverter
    value.converter: org.apache.camel.kafkaconnector.converters.S3ObjectConverter
    camel.source.kafka.topic: s3-topic
    camel.source.url: aws-s3://camel-connector-test?autocloseBody=false
    camel.source.maxPollDuration: 10000
    camel.component.aws-s3.configuration.access-key: ${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_access_key_id}
    camel.component.aws-s3.configuration.secret-key: ${file:/opt/kafka/external-configuration/aws-credentials/aws-credentials.properties:aws_secret_access_key}
    camel.component.aws-s3.configuration.region: US_EAST_1

Подводим итоги


У безопасности секретов в Kubernetes есть свои ограничения, и любой пользователь с правами на выполнение внутри контейнера в любом случае сможет прочитать подключенные секреты. Но описанный в данной статье способ как минимум позволяет не раскрывать учетные данные, API-ключи и другие секретные сведения при использовании REST API или пользовательских ресурсов в KafkaConnector.