Привет, Хабр! В наше время при постоянном росте объемов данных и необходимостью более быстрой и надежной обработки информации, мы сталкиваемся с требованием к эффективному обмену и синхронизации данных между различными системами. Отслеживание и обработка данных в реальном времени стало жизненно необходимым для современных приложений.

В этой статье мы рассмотрим, как Kafka Connect – мощный инструмент из экосистемы Apache Kafka – приходит на помощь при решении сложной задачи синхронизации данных между базами данных. Мы рассмотрим, как используя Kafka Connect, мы можем эффективно следить за изменениями в одной базе данных, обрабатывать их в нашем Java приложении и мгновенно записывать их в другую базу данных, обеспечивая надежность и безопасность данных.

Построим гибкую и масштабируемую архитектуру, которая позволит нам забыть о проблемах, связанных с несогласованными данными, и наслаждаться мгновенным доступом к актуальной информации для наших бизнес-процессов.

Что мы будем использовать?

  • Docker

  • Kubectl

  • Minikube

  • Helm (v3)

  • Java

Предполагается, что всё уже установлено и работает в штатном режиме. В связи с использованием minikube мы будем жить с некоторыми ограничениями.

Комментарии к коду приведены непосредственно в коде

Какой флоу мы построим?

Путь данных от одного сервиса к другому
Путь данных от одного сервиса к другому

Мы будем асинхронно передавать сообщения. Kafka - шина событий, в топик которой попадают ивенты, происходящие в другом сервисе. Мы сможем получить:

  • Независимую обработку данных

  • Масштабируемость (Использование нескольких брокеров)

  • Отказоустойчивость (В случае недоступности сервиса, данные в топики будут ждать, пока сервис не восстановится)

  • Гибкость архитектуры (Возможность обрабатывать ивенты в нескольких местах для разных целей)

За передачу ивентов будет отвечать Kafka Connect , предназначенный для интеграции между источниками данных и Kafka. Он имеет:

  • Коннекторы для подключения к базам данных, файловой системе, облачным сервисам, а также пользовательские коннекторы

  • Горизонтальное масштабирование

  • Обработку данных в реальном времени

Разработка первого Java приложения

Для начала мы напишем Java приложение, которое будет писать в нашу базу данных:
Код отправителя типовой. Мы лишь вставляем несколько записей в базу. Полный код доступен на GitHub. Единственное, что могу отметить, это сущность, которую мы будем записывать:

public class PersonalData {

 @Id
 @GeneratedValue(strategy = GenerationType.IDENTITY)
 private Long id;
 private String bankBic;
 private String bankName;
 @Column(name = "last_update")
 private LocalDateTime lastUpdate = LocalDateTime.now();

 public PersonalData(String bankBic) {
   this.bankBic = bankBic;
 }

}

bankBic будем получать, используя рандомайзер, bankName всегда будет null, в качестве даты последнего обновления берём текущую.

Деплой в DockerHub

После окончания написания кода, необходимо загрузить наш сервис в DockerHub.

Для этого напишем Dockerfile для подготовки образа:

FROM maven:3.8.4-openjdk-17-slim AS builder

COPY pom.xml /build/
COPY src /build/src/
WORKDIR /build
RUN mvn -B -e -C -T 1C -DskipTests clean package \
    && rm -rf ~/.m2
FROM openjdk:17-slim
COPY --from=builder /build/target/producer-*.jar /app/application.jar
WORKDIR /app
EXPOSE 8080

ENTRYPOINT ["java", "-jar", "application.jar"]

Теперь, когда всё готово мы можем собрать наше приложение с присвоением репозитория, имени и тега.

docker login

docker build -t <your nickname>/<image name>:<tag> .

После успешной сборки образа, отправим его в DockerHub:

docker push <your nickname>/<image name>:<tag>

Теперь, когда образ загружен, необходимо подготовить для него деплоймент

Но перед этим подготовим нашу базу PostgreSQL, куда будет писать наш сервис

Подготовка окружения

Первым делом мы запустим PostgreSQL. Для этого нам понадобится создать 3 файла: storage.yaml, pv.yaml, pvc.yaml. Содержимое приведено ниже, подробнее о файлах ещё ниже:

storage.yaml
kind: StorageClass
apiVersion: storage.k8s.io/v1
metadata:
  name: local-storage
provisioner: kubernetes.io/no-provisioner
volumeBindingMode: WaitForFirstConsumer

pv.yaml
apiVersion: v1
kind: PersistentVolume
metadata:
  name: "pv-pg"
  labels:
    type: local
spec:
  capacity:
    storage: "4Gi"
  volumeMode: Filesystem
  accessModes:
    - ReadWriteOnce
  persistentVolumeReclaimPolicy: Retain
  storageClassName: local-storage
  local:
    path: "/opt/"
  nodeAffinity:
    required:
      nodeSelectorTerms:
        - matchExpressions:
            - key: kubernetes.io/hostname
              operator: In
              values:
                - minikube

pvc.yaml
kind: PersistentVolumeClaim
apiVersion: v1
metadata:
  name: "pvc-pg"
spec:
  storageClassName: "local-storage"
  accessModes:
    - ReadWriteOnce
  resources:
    requests:
      storage: "4Gi"

StorageClass - хранилище, где мы определяем, что:

provisioner: kubernetes.io/no-provisioner - мы не будем динамически создавать локальные хранилища поскольку в kubernetes нет такой возможности; 

volumeBindingMode: WaitForFirstConsumer - предоставлять хранилище будем лишь при необходимости в этом.

PV (Persistent Volume) - постоянные тома, которые предоставляют непосредственно хранилище для данных.

capacity: storage: "4Gi" - определяем размер.

accessModes: - ReadWriteOnce - режим доступа. В нашем случае хранилище доступно только для одного пода. Возможно варианты: 

  • ReadOnlyMany - множество подов, но только на чтение;

  • ReadWriteMany - чтение и запись для множества подов;

persistentVolumeReclaimPolicy: Retain - определяет политику восстановления тома после того, как он перестанет использоваться. В нашем случае том будет освобождён и останется существовать. Также доступны:

  • Delete - удаление данных и PV;

  • Recycle - устаревшая политика. Удаляет данные, но не PV;

storageClassName: local-storage - указываем какой StorageClass используем.
local: path: "/opt/" - определяем путь к локальному хранилищу на узле (путь должен существовать).

PVC (PersistentVolumeClaim) - запрос к PV на предоставление ресурсов определённого размера.

storageClassName: "local-storage" - указываем какой StorageClass используем.

accessModes: - ReadWriteOnce - определяем режим доступа к PVC аналогично PV
resources: requests: storage: "4Gi" - определяем запрашиваемый размер хранилища в 4 гигабайта.

StorageClass определяет тип и параметры хранилища, но не физическое хранилище. Это делает PersistentVolume. PersistentVolumeClaim  запрашивает предоставление хранилища и если есть доступный PV с подходящим размером и классом хранилища, то PVC к нему привяжется и поды смогут использовать это хранилище.

Теперь, когда всё готово, необходимо установить PostgreSQL. Для этого используем репозиторий bitnami.

Добавим репозиторий bitnami:

helm repo add bitnami https://charts.bitnami.com/bitnami

Обновим репозиторий:

helm repo update

И теперь выполним команду для установки:

helm install postgresql-dev bitnami/postgresql 
--set primary.persistence.existingClaim=pvc-pg,auth.postgresPassword=pgpass

Выполним команду kubectl get pods -w с флагом -w , чтобы отслеживать состояние подов в прямом эфире. Проследим за запуском нашей базы и в случае успеха можно переходить дальше.

Отправитель

Теперь мы можем подготовить деплоймент, чтобы развернуть наше приложение, которое уже сможет выполнять свои функции - писать в базу данных. Создадим файл producer-deploy.yaml со следующим содержимым:

producer-deploy.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: producer-dep
  labels:
    app: marmarks-dep
spec:
  replicas: 1
  selector:
    matchLabels:
      app: marmarks-dep
  template:
    metadata:
      labels:
        app: marmarks-dep
    spec:
      containers:
        - name: producer-dep
          image: "marmarks/producer:0.2"
          imagePullPolicy: IfNotPresent
          envFrom:
            - configMapRef:
                name: producer-config-map
            - secretRef:
                name: producer-secrets
          ports:
            - containerPort: 8080
              protocol: TCP
          readinessProbe:
            httpGet:
              path: /actuator/health
              port: 8080
            initialDelaySeconds: 5
            periodSeconds: 3

---

apiVersion: v1
kind: ConfigMap
metadata:
  name: producer-config-map
  labels:
    app: marmarks-dep
data:
  SERVER_PORT: "8080"
  SPRING_PROFILES_ACTIVE: "dev"
  SPRING_DATASOURCE_URL: "jdbc:postgresql://postgresql-dev:5432/postgres"

---

apiVersion: v1
kind: Secret
metadata:
  name: producer-secrets
  labels:
    app: marmarks-dep
type: Opaque
stringData:
  SPRING_DATASOURCE_USERNAME: "postgres"
  SPRING_DATASOURCE_PASSWORD: "pgpass"

Здесь, помимо деплоймнета, мы создадим ConfigMap и хранилище секретов, куда передадим необходимые переменные окружения для подключения. Более подробный разбор конфигураций будет приведён в следующей части статьи, где мы рассмотрим Helm Chart.

Запустим наш деплоймент, используя kubectl apply -f producer-deploy.yaml

Выполним команду kubectl get pods -w.

Когда образ будет загружен и под станет доступным, можем зайти внутрь пода PostgreSQL выполнив команду kubectl exec -i -t -n default postgresql-dev-0 -c postgresql -- sh -c "clear; (bash || ash || sh)"

Находясь внутри подключимся к базе выполнив команду: psql -h postgresql-dev -p 5432 -U postgres -d postgres

Следующим шагом у нас запросят пароль. В моём случае я введу pgpass

Теперь, когда мы внутри, выполним SELECT * FROM personal_data; для получения содержащихся внутри записей. Получим что-то типа такого:

|  id  |  bank_bic   | bank_name |                  last_update                       |
|------|-------------------|---------------------|--------------------------------------------------------|
|  1  |   1234567   |                        | 2023-07-27 17:06:44.052581         |
|  2  |   1234568   |                        | 2023-07-27 17:06:44.266662         |
|  3   |   1234569   |                        | 2023-07-27 17:06:44.272390         |
|  4  |   1234510   |                        | 2023-07-27 17:06:44.276271       |
|  5  |   1234511 |                        | 2023-07-27 17:06:44.339826         |

Kafka

Теперь, когда наш образ и база данных готовы для работы и мы убедились, что они работают, мы поднимим Kafka Cluster. Теперь загрузим Zookeeper и Kafka из репозитория bitnami:

helm install zookeeper bitnami/zookeeper
helm install kafka bitnami/kafka

Так мы используем настройки по умолчанию. Для того, чтобы установить число реплик равным 3, необходимо добавить  --set replicaCount=3 в команду для чарта Kafka. Если чарт уже запущен, сделаем helm upgrade kafka bitnami/kafka --set replicaCount=3

Альтернативным путём будет запустить чарты из их репозитория на GitHub.

Kafka Connect

Теперь необходимо подготовить необходимый образ kafka-connect. Для этого напишем Dockerfile, где установим в наш образ jdbc connect для чтения из базы данных, а также avro converter для конвертации данных в формат Avro:

FROM confluentinc/cp-kafka-connect-base:6.2.1

RUN confluent-hub install --no-prompt confluentinc/kafka-connect-avro-converter:5.5.4

RUN confluent-hub install --no-prompt confluentinc/kafka-connect-jdbc:10.1.1

После этого соберем наш образ и загрузим в DockerHub, как мы делали это ранее. 

Когда наш образ собран и загружен, обратимся к confluentinc для получения helm чарта - GitHub. Клонируем себе репозиторий и выполним команду:

helm dependency update charts/cp-kafka/

После этого используем наш образ для запуска Kafka Connect:

helm install kafka-connect \
 --set image="marmarks/cp-kafka-connect-jdbc" \
 --set imageTag="6.2.1" \
 --set kafka.bootstrapServers="PLAINTEXT://kafka-headless:9092" \
 --set prometheus.jmx.enabled=false \
 ./charts/cp-kafka-connect

где Image, imageTag, необходимо заменить на свои параметры.

kafka.bootstrapServers - путь до брокера. Необходимо заменить на свой путь, если запускали kafka отличным от моего способа и в этом есть необходимость.

prometheus.jmx.enabled=false чтобы отключить сбор метрик.

Если реплик Kafka не три, то следует дополнить команду сменив число реплик топиков, поскольку по умолчанию их 3. Итоговая команда (для 1 реплики) будет выглядеть так:

helm install kafka-connect \
 --set image="marmarks/cp-kafka-connect-jdbc" \
 --set imageTag="6.2.1" \
 --set kafka.bootstrapServers="PLAINTEXT://kafka-headless:9092" \
 --set prometheus.jmx.enabled=false \
 --set config.storage.replication.factor="1" \ 
 --set offset.storage.replication.factor="1" \ 
 --set  status.storage.replication.factor="1" \
 ./charts/cp-kafka-connect

Schema Regestry

Следующим шагом следует запустить Schema Regestry. Для этого используем следующую команду:

helm install schema-registry \
  --set kafka.bootstrapServers="PLAINTEXT://kafka-headless:9092" \
  --set prometheus.jmx.enabled=false   ./charts/cp-schema-registry

Здесь мы снова указываем путь до брокера (при необходимости изменить) и отключаем сбор метрик.

Для работы с Kafka Connect и брокером, мы поднимем под Kafka Client. Создадим файл kafka-client.yaml с содержимым:

kafka-client.yaml
apiVersion: v1
kind: Pod
metadata:
  name: kafka-client
  namespace: default
spec:
  containers:
  - name: kafka-client
    image: confluentinc/cp-enterprise-kafka:5.4.1
    command:
    - sh
    - -c
    - "sleep infinity"

Применим с помощью kubectl apply -f kafka-client.yaml

Когда kafka-client запущен, мы для удобства зайдём в него с помощью команды: 

kubectl exec -i -t -n default kafka-client -c kafka-client -- sh -c "clear; (bash || ash || sh)"

Проверим наличие коннекторов: 

curl -X GET http://kafka-connect-cp-kafka-connect:8083/connectors

На текущем этапы мы получим [], что будет означать, что коннекторов не создано. Создадим наш коннектор:

curl -X POST \
  -H "Content-Type: application/json" \
  --data '{
    "name": "kafka-connector",
    "config": {
     "connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector", 
       "key.converter": "io.confluent.connect.avro.AvroConverter", 
       "value.converter": "io.confluent.connect.avro.AvroConverter", 
       "key.converter.schema.registry.url": "http://schema-registry-cp-schema-registry:8081", 
       "value.converter.schema.registry.url": "http://schema-registry-cp-schema-registry:8081",
      "tasks.max": 1,
      "connection.url": "jdbc:postgresql://postgresql-dev:5432/postgres?user=postgres&password=pgpass",
      "table.whitelist": "personal_data",
      "mode": "incrementing",
      "incrementing.column.name": "id",
      "topic.prefix": "kafka-connect-",
      "poll.interval.ms": 1000
    }
  }' \
  http://kafka-connect-cp-kafka-connect:8083/connectors
  • "name": "kafka-connector" - укажем имя для коннектора.

  • "connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector" - определяет класс коннектора, позволяющий читать данные из базы данных с использованием JDBC.

  • "key.converter": "io.confluent.connect.avro.AvroConverter" - указываем конвертер для ключей сообщений, который будет использоваться при записи данных в Kafka. Используем AvroConverter, который позволяет сериализовать ключи сообщений в Avro формат перед отправкой в Kafka.

  • "value.converter": "io.confluent.connect.avro.AvroConverter" - задаем конвертер для значений сообщений, который будет использоваться при записи данных в Kafka. Аналогично ключам, значения также сериализуются в Avro формат.

  • "key.converter.schema.registry.url": "http://schema-registry-cp-schema-registry:8081" - указываем URL-адрес Schema Registry, который будет использоваться для регистрации схемы ключей сообщений в Avro формате.

  • "value.converter.schema.registry.url": "http://schema-registry-cp-schema-registry:8081" - указываем URL-адрес Schema Registry, который будет использоваться для регистрации схемы значений сообщений в Avro формате.

  • "tasks.max": 1 - указываем количество потоков для данного коннектора, когда у нас больше 1 пода Kafka Connect, имеет смысл ставить больше потоков.

  • "connection.url": "jdbc:postgresql://postgresql-dev:5432/postgres?user=postgres&password=pgpass" - задаем URL-подключения к базе данных PostgreSQL, которая будет использоваться для чтения данных.

  • "table.whitelist": "personal_data" - определяем список таблиц, которые будут считываться из базы данных. Если не перечислить здесь таблицы, то данные будут считываться из всех таблиц доступных для пользователя переданного в connection.url

  • "mode": "incrementing" - указываем режим считывания данных, это инкрементный режим, который использует значение столбца для определения обновлений.

  • "incrementing.column.name": "id" - указываем имя столбца, используемого для определения обновлений в инкрементном режиме.

  •  "timestamp.column.name": "last_update" - указываем имя столбца, для получения обновлений уже существующих записей

  • "topic.prefix": "kafka-connect-" - задаем префикс для имен топиков, которые будут создаваться в Kafka при записи данных из базы данных.

  • "poll.interval.ms": 1000: Задает интервал между опросами базы данных на предмет обновлений данных. В данном случае, это 1000 миллисекунд (1 секунда).

Дополнительно отмечу, что вручную мы никаких схем регистрировать не будем. Kafka Connect сам создаст необходимые схемы и запишет их в хранилище. Помимо приведённых выше конфигураций, в документации можно найти ещё множество различных настроек.

А работает ли?

После отправки выполним curl -X GET http://kafka-connect-cp-kafka-connect:8083/connectors/kafka-connector/status для получения статуса созданного коннектора. В норме статус будет выглядеть следующим образом:

{
  "name": "kafka-connector",
  "connector": {
    "state": "RUNNING",
    "worker_id": "10.244.2.104:8083"
  },
  "tasks": [
    {
      "id": 0,
      "state": "RUNNING",
      "worker_id": "10.244.2.104:8083"
    }
  ],
  "type": "source"
}

Чтобы получить конфигурацию коннектора необходимо выполнить команду:

curl -X GET http://kafka-connect-cp-kafka-connect:8083/connectors/kafka-connector/config

Теперь получим список доступных топиков в Kafka, используя команду: 

kafka-topics --bootstrap-server kafka:9092 --list

Здесь мы увидим топик с нашими записями из таблицы personal data -> kafka-connect-personal_data. Топик с конфигурациями, оффсетом, статусами -> kafka-connect-cp-kafka-connect-config, kafka-connect-cp-kafka-connect-offset, kafka-connect-cp-kafka-connect-status, соответственно.

Теперь проверим наличие записей в нашем топике: 

kafka-console-consumer --bootstrap-server kafka:9092 --topic kafka-connect-personal_data --from-beginning

В случае, если всё прошло успешно, это будет выглядеть как-то так:

{"id":1,"bank_bic":"1234568","bank_name":null,"last_update":1690113936653}
{"id":2,"bank_bic":"1234569","bank_name":null,"last_update":1690113936752}
{"id":3,"bank_bic":"1234510","bank_name":null,"last_update":1690113936843}
{"id":4,"bank_bic":"1234511","bank_name":null,"last_update":1690113936858}

Теперь посмотрим на схемы в Schema Regestry. Для получения списка доступных схем выполним команду:

curl -X GET http://schema-registry-cp-schema-registry:8081/subjects

В моём случае ответ выглядит так: ["kafka-connect-personal_data-value"]

Теперь узнаем версию схемы: 

curl -X GET http://schema-registry-cp-schema-registry:8081/subjects/kafka-connect-personal_data-value/versions

В моём случае она первая, и теперь посмотрим саму схему:

curl -X GET http://schema-registry-cp-schema-registry:8081/subjects/kafka-connect-personal_data-value/versions/1

Ответ:

{
  "subject": "kafka-connect-personal_data-value",
  "version": 1,
  "id": 1,
  "schema": {
    "type": "record",
    "name": "personal_data",
    "fields": [
      {
        "name": "id",
        "type": "long"
      },
      {
        "name": "bank_bic",
        "type": ["null", "string"],
        "default": null
      },
      {
        "name": "bank_name",
        "type": ["null", "string"],
        "default": null
      },
      {
        "name": "last_update",
        "type": ["null", { "type": "long", "connect.version": 1, "connect.name": "org.apache.kafka.connect.data.Timestamp", "logicalType": "timestamp-millis" }],
        "default": null
      }
    ],
    "connect.name": "personal_data"
  }
}

Мы развернули множество сервисов, убедились, что каждый из них общается друг с другом, все данные успешно доставляются от одного сервису к другому, и всё работает в штатном режиме. Повторим наш флоу: данные попадают в базу данных, откуда их считывает опираясь на поля id (для новых записей) и last_update (для уже существующих) Kafka Connect, после этого он отправляет их брокеру Kafka в отдельный топик, а также генерирует схему данных, которую отправляет в Schema Regestry. Теперь осталось подготовить сервис, который будет получать схемы из реестра, считывать данные из топика, модифицировать их по своему усмотрению и записывать в базу данных. Также подготовим Helm Chart для удобного развёртывания нашего отправителя и получателя. Всё это будет проделано в следующей части статьи.

Комментарии (0)