Данная статья является продолжением статьи - Ивентная модель данных с использованием Kafka и Kafka Connect: Построение гибкой и распределенной архитектуры.

Добро пожаловать во вторую часть статьи о построении гибкой и распределенной архитектуры с использованием Apache Kafka и Kafka Connect! В первой части мы ознакомились с ивентной моделью данных, разработали сервис отправителя и настроили интеграцию с Kafka, чтобы асинхронно отправлять сообщения. Теперь настало время рассмотреть вторую часть этого увлекательного проекта.

В этой части статьи мы сфокусируемся на реализации получателя на Java, который будет получать Avro-схемы из Schema Registry и читать сообщения из Kafka, отправленные сервисом отправителя через Kafka Connect. Получатель будет обрабатывать сообщения из топика, обогащать их своими данными и сохранять результаты в своей базе данных.

Цели статьи:

  1. Разработка Java-приложения в качестве получателя данных из Kafka.

  2. Изучение работы с Avro-схемами и Schema Registry для обеспечения совместимости сообщений.

  3. Обработка сообщений из Kafka с использованием асинхронного подхода.

  4. Интеграция с PostgreSQL для сохранения обработанных данных.

  5. Подготовка Helm Chart для развертывания отправителя и получателя в Kubernetes.

  6. Конфигурирование Kubernetes для обеспечения масштабируемости и отказоустойчивости архитектуры.

Подготовка Java-приложения в качестве получателя

Для начала подготовим наш POM файл

pom.xml
<dependencies>
 <dependency>
   <groupId>com.google.guava</groupId>
   <artifactId>guava</artifactId>
   <version>32.1.1-jre</version>
 </dependency>
 <dependency>
   <groupId>io.confluent</groupId>
   <artifactId>kafka-avro-serializer</artifactId>
   <version>7.4.0</version>
 </dependency>
 <dependency>
   <groupId>io.confluent</groupId>
   <artifactId>kafka-schema-registry-client</artifactId>
   <version>7.4.0</version>
 </dependency>
 <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-data-jpa</artifactId>
 </dependency>
 <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-actuator</artifactId>
 </dependency>
 <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-web</artifactId>
 </dependency>
 <dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka</artifactId>
 </dependency>
 <dependency>
   <groupId>org.postgresql</groupId>
   <artifactId>postgresql</artifactId>
   <scope>runtime</scope>
 </dependency>
 <dependency>
   <groupId>org.projectlombok</groupId>
   <artifactId>lombok</artifactId>
   <optional>true</optional>
 </dependency>
 <dependency>
   <groupId>org.springframework.boot</groupId>
   <artifactId>spring-boot-starter-test</artifactId>
   <scope>test</scope>
 </dependency>
 <dependency>
   <groupId>org.springframework.kafka</groupId>
   <artifactId>spring-kafka-test</artifactId>
   <scope>test</scope>
 </dependency>
</dependencies>

Здесь мы добавим библиотеки необходимые для работы со Schema Regestry, PostgreSQL, Kafka и проч. Чтобы успешно выгрузить библиотеки io.confluent необходимо подключиться к их репозиторию

<repositories>
 <repository>
   <id>confluent</id>
   <url>https://packages.confluent.io/maven/</url>
 </repository>
</repositories>

Теперь напишем yaml конфигурацию для проекта:

# Порт сервера
server:
  port: 8081
# Настройки Spring
spring:
  main:
    allow-bean-definition-overriding: true
  application:
    name: kafka-example-consumer
  jpa:
    # Диалект базы данных
    database-platform: org.hibernate.dialect.PostgreSQLDialect
    generate-ddl: false
    hibernate:
      ddl-auto: update
    show-sql: true
    properties:
      hibernate:
        generate_statistics: false
  datasource:
    # URL для подключения к базе данных PostgreSQL
    url: jdbc:postgresql://localhost:5432/public
    username: postgres
    password: postgres
  kafka:
    schema:
      registry:
        # Пути до хранилищ схем если у нас их несколько (дополнительная конфигурация)
        urls: http://schema-registry-cp-schema-registry:8081
    bootstrap-servers: localhost:29092
    listener:
      # Как подтверждаем получение сообщений
      ack-mode: record
    producer:
      client-id: ${spring.application.name}
      # Сериализатор ключа
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # Сериализатор значения в DLT очередь
      value-serializer: ru.marmarks.consumer.config.DltMessageSerializer
      retries: 3
    consumer:
      group-id: ${spring.application.name}
      autoOffsetReset: earliest
      # Сериализаторы для ключа и значения с обработкой ошибок
      keyDeserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      valueDeserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
      properties:
        # Тип значения в JSON по умолчанию
        spring.json.value.default.type: org.apache.avro.generic.GenericRecord
        spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
       # Сериализатор значения
        spring.deserializer.value.delegate.class: io.confluent.kafka.serializers.KafkaAvroDeserializer
        specific:
          avro:
            reader: true
            # Хранилище схем по умолчанию
        schema:
          registry:
            url: http://schema-registry-cp-schema-registry:8081
# Настройки Kafka
kafka:
  topics:
    # Топик из которого будем получать записи
    personal-data: kafka-connect-personal_data
  services:
    # Для определения того из каких топиков мы поддерживаем чтение
    personal-data: kafka-connect-personal_data

Подключение к Schema Registry и Kafka брокеру

Теперь перейдём к конфигурированию Kafka. Комментарии приведены в коде. Здесь мы конфигурируем подключение к Schema Regestry, слушателя и отправку сообщений, которые не вышло обработать в dlt очередь

KafkaConfiguration.java
@Configuration
@EnableKafka
@RequiredArgsConstructor
@EnableConfigurationProperties(SchemaRegistryProperties.class)
public class KafkaConfiguration {
 private static final int DEFAULT_CACHE_CAPACITY = 200;
 private static final String DLT_TOPIC_SUFFIX = ".dlt";
 private final ConsumerFactory<String, GenericRecord> consumerFactory;
 private final ProducerFactory<Object, Object> producerFactory;

 @Bean
 public ConcurrentKafkaListenerContainerFactory<String, GenericRecord> objectKafkaListenerContainerFactory(
     DefaultErrorHandler errorHandler
 ) {
   ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory = new ConcurrentKafkaListenerContainerFactory<>();
   factory.setConsumerFactory(consumerFactory);
   factory.setCommonErrorHandler(errorHandler);
   return factory;
 }

 /**
  * Клиент для кеширования схем
  */
 @Bean
 public CachedSchemaRegistryClient cachedSchemaRegistryClient(
     SchemaRegistryProperties schemaRegistryProperties) {
   // Сервис для коммуникацией со Schema Registry
   RestService restService = new RestService(schemaRegistryProperties.urls());
   //Также передаём размер кэша схем
   return new CachedSchemaRegistryClient(restService, DEFAULT_CACHE_CAPACITY);
 }

 @Bean
 public KafkaTemplate<Object, Object> kafkaTemplate() {
   return new KafkaTemplate<>(producerFactory);
 }

 /**
  * Публикатор в dead-letter topic.
  */
 @Bean
 public DeadLetterPublishingRecoverer publisher(KafkaTemplate<Object, Object> bytesTemplate) {
   //  Определяем логику выбора партиции для отправки сообщения в DLT.
   //  В данном случае, создаём новый объект TopicPartition, используя имя
   //топика (consumerRecord.topic()) и добавляя суффикс DLT_TOPIC_SUFFIX,
   //  а также номер партиции (consumerRecord.partition()).
   //  Следовательно в DLT топике должно быть столько партиций
   //сколько и в топике откуда читаем
   return new DeadLetterPublishingRecoverer(bytesTemplate, (consumerRecord, exception) ->
       new TopicPartition(consumerRecord.topic() + DLT_TOPIC_SUFFIX, consumerRecord.partition()));
 }

 /**
  * Обработчик исключений при получении сообщений из kafka по умолчанию.
  */
 @Bean
 public DefaultErrorHandler errorHandler(
     DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
   final var handler = new DefaultErrorHandler(deadLetterPublishingRecoverer);
   // Обрабатываем любые исключения и отправляем в DLT
   handler.addNotRetryableExceptions(Exception.class);
   return handler;
 }

}

Посмотрим на сущность, которую будем обрабатывать: 

public class ClientData {

 @Id
 @GeneratedValue(strategy = GenerationType.IDENTITY)
 private Long id;
 private String bankBic;
 private String bankName;
 private String fio;
 private Long correlationId;
 private LocalDateTime lastUpdate;

}

Добавили новые поля, относительно отправляемой сущности - fio, correlationId. Предположим, что мы заполняем ФИО, обращаясь в какой-то сторонний сервис.

Обработка сообщений из топика

Теперь перейдём к тому, как будем обрабатывать приходящее сообщение

KafkaConsumerListeners.java
public class KafkaConsumerListeners {

 // Получим сервисы, которые смогут обрабатывать приходящие сообщения
 private final List<ServiceResolver> serviceResolverList;
 // Определяем топики из которых будем читать. Их может быть несколько.
  //Также определим конфигурацию листнера
 @KafkaListener(topics = "#{'${kafka.topics.personal-data}'.split(',')}", containerFactory = "objectKafkaListenerContainerFactory")
 // Принимаем абстрактный класс GenericRecord.
  //Мы сможем обработать только то сообщение, для которого определена 
  //схема в Schema Regestry
 void readOrganizationConnectorMessages(ConsumerRecord<String, GenericRecord> message) {
   readMessage(message);
 }

 private void readMessage(ConsumerRecord<String, GenericRecord> message) {
   String topic = message.topic();
   long offset = message.offset();
   log.info("Сообщение из топика: {} offset: {}", topic, offset);

   // Получаем само сообщение
   GenericRecord value = message.value();

   if (value == null) {
     log.info("Пустое сообщение из топика: {} offset: {}", topic, offset);
     return;
   }

   // Находим сервис, который сможет обработать сообщение
   ServiceResolver resolver = serviceResolverList.stream()
       .filter(it -> it.isSupported(topic))
       .findFirst()
       .orElseThrow(() -> new RuntimeException("Сервис не найден для топика: " + topic));

   // Передаём сообщение на обработку
   resolver.process(value);

   log.info("Сообщение обработано: топик {} offset {}", topic, offset);
 }

}

Комментарии по коду приведены непосредственно в коде. Реализация заточена на то, что мы сможем обрабатывать несколько топиков с различными сообщения. Схема каждого сообщения должна быть приведена в Schema Regestry

Работа с Avro-схемами

Посмотрим на обработку сообщения в сервисе

Обработка сообщения
public void process(GenericRecord value) {
 Long id = Long.valueOf(String.valueOf(value.get("id")));
 String bankBic = String.valueOf(value.get("bank_bic"));
 String bankName = String.valueOf(value.get("bank_name"));
 String lastUpdate = String.valueOf(value.get("last_update"));
 long millis = Long.parseLong(lastUpdate);
 Instant instant = Instant.ofEpochMilli(millis);
 LocalDateTime lastUpdateTime = instant.atZone(ZoneId.systemDefault()).toLocalDateTime();

 // Подготовим новую сущность на основе приходящих данных
 ClientData clientData = new ClientData();
 clientData.setBankBic(bankBic);
 clientData.setFio("Рауль " + id);
 clientData.setBankName(bankName);
 clientData.setCorrelationId(id);
 clientData.setLastUpdate(lastUpdateTime);

 ClientData save = clientDataRepository.save(clientData);
 log.info("Данные клиента сохранены: {}", save);

}

В данном случае мы примитивным способом вытаскиваем все поля из сообщения и на основе них создаём новую сущность, которую модифицируем новыми данными, которые мы можем получить обратившись куда-либо ещё или иным способом и записываем в базу

Полный код получателя доступен на GitHub.

Helm Chart

Теперь перейдём к Helm чарту, который будет разворачивать нашего отправителя и получателя.

Подготовим файл с информацией о чарте:

# Версия API Helm Chart
apiVersion: v2
# Имя чарта
name: marmarks-chart
# Версия чарта
version: 1.0.0
# Описание
description: Some project
# Тип
type: application
sources:
  - https://github.com/marmarks
maintainers:
  - name: Maksim Krylov
    email: kryloumaksim@gmail.com
    url: https://habr.com/ru/users/Marmaksmark/
    # Версия приложения
appVersion: 1.0.0

Конфигурации и шаблоны

Следующим шагом подготовим файл values.yaml, который предоставляет конфигурации для чарта. Помимо того, что настройки будут определены в этом файле мы сможем также их переопределять используя --set при запуске чарта

producer:
  # Конфигурация деплоймента
  deployment:
    # Имя деплоймента
    name: "producer"
    # Число реплик
    replicas: 1
    # Настройки контейнера
  container:
      name: "producer-container"
      resources: { }
    # Настройки сервиса
  service:
      name: "producer-service"
      port: 8080
    # Информация об образе сервиса
  image:
      name: marmarks/producer
      tag: "0.2"
      # Загрузим образ, если локально его нет
      pullPolicy: IfNotPresent


consumer:
  deployment:
    name: "consumer"
    replicas: 1
  container:
    name: "consumer-container"
    resources: { }
  service:
    name: "consumer-service"
    port: 80
    #  тип сервиса: ClusterIP, NodePort, LoadBalancer
    type: LoadBalancer
  image:
    name: marmarks/consumer
    tag: "0.5"
    pullPolicy: IfNotPresent

Также мы определим файл _helpers.tpl содержащий вспомогательные шаблоны, такие как:

_helpers.tpl
  1. Имена меток для связей между объектами

{{- define "marmarks-chart.selectorLabels" -}}
app.kubernetes.io/name: {{ .Chart.Name }}
app.kubernetes.io/instance: {{ .Release.Name }}
{{- end -}}
  1. Имя и версия чарта

{{- define "marmarks-chart.chart" -}}
{{- printf "%s-%s" .Chart.Name .Chart.Version | replace "+" "_" | trunc 60 | trimSuffix "-" -}}
{{- end -}}
  1. Имена меток в манифестах

{{- define "marmarks-chart.labels" -}}
helm.sh/chart: {{ include "marmarks-chart.chart" . }}
{{ include "marmarks-chart.selectorLabels" . }}
{{- if .Chart.AppVersion }}
app.kubernetes.io/version: {{ .Chart.AppVersion | quote }}
{{- end }}
app.kubernetes.io/managed-by: {{ .Release.Service }}
{{- end -}}
  1. Функция для вычисления хэш суммы. Для обновления чарта при изменении секретов или конфигураций

{{- define "marmarks-chart.propertiesHash" -}}
{{- $producerSecrets := include (print $.Template.BasePath "/producer-secrets.yaml") . | sha256sum -}}
{{- $consumerSecrets := include (print $.Template.BasePath "/consumer-secrets.yaml") . | sha256sum -}}
{{- $producerConfig := include (print $.Template.BasePath "/producer-config.yaml") . | sha256sum -}}
{{- $consumerConfig := include (print $.Template.BasePath "/consumer-config.yaml") . | sha256sum -}}
{{ print $producerSecrets $producerConfig $consumerSecrets $consumerConfig | sha256sum }}
{{- end -}}
  1. Имена компонентов отправителя

{{- define "marmarks-chart.producer.defaultName" -}}
{{- printf "producer-%s" .Release.Name -}}
{{- end -}}

{{- define "marmarks-chart.producer.deployment.name" -}}
{{- default (include "marmarks-chart.producer.defaultName" .) .Values.producer.deployment.name | trunc 63 | trimSuffix "-" -}}
{{- end -}}

{{- define "marmarks-chart.producer.container.name" -}}
{{- default (include "marmarks-chart.producer.defaultName" .) .Values.producer.container.name | trunc 63 | trimSuffix "-" -}}
{{- end -}}

{{- define "marmarks-chart.producer.service.name" -}}
{{- default (include "marmarks-chart.producer.defaultName" .) .Values.producer.service.name | trunc 63 | trimSuffix "-" -}}
{{- end -}}
  1. Имена компонентов получателя

{{- define "marmarks-chart.consumer.defaultName" -}}
{{- printf "consumer-%s" .Release.Name -}}
{{- end -}}

{{- define "marmarks-chart.consumer.deployment.name" -}}
{{- default (include "marmarks-chart.consumer.defaultName" .) .Values.consumer.deployment.name | trunc 63 | trimSuffix "-" -}}
{{- end -}}

{{- define "marmarks-chart.consumer.container.name" -}}
{{- default (include "marmarks-chart.consumer.defaultName" .) .Values.consumer.container.name | trunc 63 | trimSuffix "-" -}}
{{- end -}}

{{- define "marmarks-chart.consumer.service.name" -}}
{{- default (include "marmarks-chart.consumer.defaultName" .) .Values.consumer.service.name | trunc 63 | trimSuffix "-" -}}
{{- end -}}
  1. Имена прочих компонентов компонентов (секретов, конфигов)

{{- define "marmarks-chart.producerSecrets.defaultName" -}}
{{- printf "producer-secrets-%s" .Release.Name -}}
{{- end -}}

{{- define "marmarks-chart.consumerSecrets.defaultName" -}}
{{- printf "consumer-secrets-%s" .Release.Name -}}
{{- end -}}

{{- define "marmarks-chart.producerConfig.defaultName" -}}
{{- printf "producer-config-%s" .Release.Name -}}
{{- end -}}

{{- define "marmarks-chart.consumerConfig.defaultName" -}}
{{- printf "consumer-config-%s" .Release.Name -}}
{{- end -}}

Получатель

Теперь, когда конфигурации и шаблоны описаны мы можем описать конфигурационный файл consumer-config.yaml для отправителя:

apiVersion: v1
kind: ConfigMap
metadata:
  name: {{ include "marmarks-chart.consumerConfig.defaultName" . }}
  labels:
    {{- include "marmarks-chart.labels" . | nindent 4 }}
data:
  SPRING_PROFILES_ACTIVE: "dev"
  SERVER_PORT: "8080"
  SPRING_KAFKA_BOOTSTRAP_SERVERS: "kafka:9092"
  SPRING_DATASOURCE_URL: "jdbc:postgresql://postgresql-dev:5432/postgres"

Здесь мы определили на каком порту будем запускать наше приложение, выбранный профиль, url брокера и url до базы данных. Логин-пароль для аутентификации будем хранить в секретах consumer-secrets.yaml:

# Манифест для создания секретов
#  Версия API
apiVersion: v1
# Тип манифеста
kind: Secret
# Метаданные манифеста
metadata:
  # Имя манифеста
  name: {{ include "marmarks-chart.consumerSecrets.defaultName" . }}
  labels:
    # Метки манифеста
    {{- include "marmarks-chart.labels" . | nindent 4 }}
  # Тип секрета: неструктурированные данные. Храни просто пару "ключ-значение"
  # Без автоматического обновления, управления, ротации и т.д.
type: Opaque
stringData:
  SPRING_DATASOURCE_USERNAME: "postgres"
  SPRING_DATASOURCE_PASSWORD: "pgpass"

Подробнее про секреты и их типы в статье - Безопасное хранение secrets в Kubernetes

Осталось описать лишь дейломент и сервис получателя consumer.yaml. Исчерпывающие комментарии оставлены в коде:

apiVersion: apps/v1
kind: Deployment
metadata:
  name: {{ include "marmarks-chart.consumer.deployment.name" . }}
  labels:
    {{- include "marmarks-chart.labels" . | nindent 4 }}
    tier: consumer
{{/*    Спецификация развёртывания*/}}
spec:
{{/*  Число реплик*/}}
  replicas: {{ .Values.consumer.deployment.replicas }}
  selector:
{{/*    Метки подов*/}}
    matchLabels:
      {{- include "marmarks-chart.selectorLabels" . | nindent 6 }}
      tier: consumer
{{/*      Стратегия обновления развёртывания*/}}
  strategy:
{{/*    Пошаговое обновление*/}}
    rollingUpdate:
{{/*      Максимальное количество реплик для обновления*/}}
      maxSurge: 25%
{{/*      Максимальное число реплик недоступных во время обновления*/}}
      maxUnavailable: 25%
{{/*      Тип стратегии развёртывания*/}}
    type: RollingUpdate
{{/*    Шаблон создания подов*/}}
  template:
    metadata:
{{/*      Аннотации*/}}
      annotations:
{{/*        Проверка чек суммы для обновления секретов и конфигов*/}}
        checksum/config: {{ include "marmarks-chart.propertiesHash" . }}
      labels:
        {{- include "marmarks-chart.selectorLabels" . | nindent 8 }}
        tier: consumer
{{/*        Спецификации пода*/}}
    spec:
{{/*      Контейнер, который будет запущен в поде*/}}
      containers:
        - name: {{ include "marmarks-chart.consumer.container.name" . }}
          image: "{{ .Values.consumer.image.name }}:{{ .Values.consumer.image.tag }}"
          imagePullPolicy: {{ .Values.consumer.image.pullPolicy }}
{{/*          Переменные окружения из конфиг мапы*/}}
          envFrom:
            - configMapRef:
                name: {{ include "marmarks-chart.consumerConfig.defaultName" . }}
{{/*          Переменные окружения секреты*/}}
            - secretRef:
                name: {{ include "marmarks-chart.producerSecrets.defaultName" . }}
          readinessProbe:
{{/*            Проверка доступности пода*/}}
            httpGet:
              path: /actuator/health
              port: 8080
            initialDelaySeconds: 5
            periodSeconds: 3
          ports:
{{/*            Порты пода*/}}
            - containerPort: 8080
{{/*              Протокол*/}}
              protocol: TCP
          resources:
{{/*            Ресурсы выделенные для контейнера*/}}
            {{- toYaml .Values.consumer.container.resources | nindent 12 }}

---

apiVersion: v1
kind: Service
metadata:
  name: {{ include "marmarks-chart.consumer.service.name" . }}
  labels:
    {{- include "marmarks-chart.labels" . | nindent 4 }}
    tier: consumer
spec:
  type: {{ .Values.consumer.service.type }}
  ports:
{{/*    Порт службы*/}}
    - port: {{ .Values.consumer.service.port }}
      protocol: TCP
{{/*      Порт контейнера, на который будет направлен трафик*/}}
      targetPort: 8080
{{/*      Имя порта*/}}
      name: http
{{/*    Селектор для выбора подов, которые будут обслужены сервисом*/}}
  selector:
    {{- include "marmarks-chart.selectorLabels" . | nindent 4 }}
    tier: consumer

Мы определили стратегию развертывания, health checks и прочее. Наш получатель уже полностью готов к приёму трафика из брокера, а брокер уже готов его нам поставлять. Однако мы сделаем аналогичную конфигурацию для отправителя

Отправитель

producer-config.yaml
apiVersion: v1
kind: ConfigMap
metadata:
  name: {{ include "marmarks-chart.producerConfig.defaultName" . }}
  labels:
    {{- include "marmarks-chart.labels" . | nindent 4 }}
data:
  SERVER_PORT: "8080"
  SPRING_PROFILES_ACTIVE: "dev"
  SPRING_DATASOURCE_URL: "jdbc:postgresql://postgresql-dev:5432/postgres"

producer-secrets.yaml
# Манифест для создания секретов
#  Версия API
apiVersion: v1
# Тип манифеста
kind: Secret
# Метаданные манифеста
metadata:
  # Имя манифеста
  name: {{ include "marmarks-chart.producerSecrets.defaultName" . }}
  labels:
    # Метки манифеста
    {{- include "marmarks-chart.labels" . | nindent 4 }}
  # Тип секрета: неструктурированные данные. Храни просто пару 
"ключ-значение"
  # Без автоматического обновления, управления, ротации и т.д.
type: Opaque
stringData:
  SPRING_DATASOURCE_USERNAME: "postgres"
  SPRING_DATASOURCE_PASSWORD: "pgpass"

producer.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: {{ include "marmarks-chart.producer.deployment.name" . }}
  labels:
    {{- include "marmarks-chart.labels" . | nindent 4 }}
    tier: producer
spec:
  replicas: {{ .Values.producer.deployment.replicas }}
  selector:
    matchLabels:
      {{- include "marmarks-chart.selectorLabels" . | nindent 6 }}
      tier: producer
  strategy:
    rollingUpdate:
      maxSurge: 25%
      maxUnavailable: 25%
    type: RollingUpdate
  template:
    metadata:
      labels:
        {{- include "marmarks-chart.selectorLabels" . | nindent 8 }}
        tier: producer
    spec:
      containers:
        - name: {{ include "marmarks-chart.producer.container.name" . }}
          image: "{{ .Values.producer.image.name }}:{{ .Values.producer.image.tag }}"
          imagePullPolicy: {{ .Values.producer.image.pullPolicy }}
          envFrom:
            - configMapRef:
                name: {{ include "marmarks-chart.producerConfig.defaultName" . }}
            - secretRef:
                name: {{ include "marmarks-chart.producerSecrets.defaultName" . }}
          ports:
            - containerPort: 8080
              protocol: TCP
          readinessProbe:
            httpGet:
              path: /actuator/health
              port: 8080
            initialDelaySeconds: 5
            periodSeconds: 3
          resources:
            {{- toYaml .Values.producer.container.resources | nindent 12 }}

---

apiVersion: v1
kind: Service
metadata:
  name: {{ include "marmarks-chart.producer.service.name" . }}
  labels:
    {{- include "marmarks-chart.labels" . | nindent 4 }}
    tier: producer
spec:
  ports:
    - port: {{ .Values.producer.service.port }}
      protocol: TCP
      targetPort: 8080
      name: http
  selector:
    {{- include "marmarks-chart.selectorLabels" . | nindent 4 }}
    tier: producer

Запуск и тестирование архитектуры

Необходимо иметь полностью развёрнутую среду из первой части статьи.

Теперь когда Helm Chart готов к запуску, находясь в директории чарта, выполним команду:

helm install <имя чарта> .

Наш чарт успешно запустится, если логи не скажут иного, и будет доступен. Зайдя в логи пода отправителя мы увидим, что он записал несколько новых сообщений в свою базу. После этого, зайдя в логи получателя, мы увидим, что он прочитал и обработал все сообщения, что были в топике раньше и появились только что. Это происходит весьма быстро.

Проверка работоспособности получателя и отправител

Если в ходе обработки сообщений произошла ошибка, то сообщения, чья обработка была безуспешна отправятся в DLT топик. Для того, чтобы убедиться в успешной записи сообщений, мы зайдём в под PostgreSQL и выполним несколько команд. Для начала авторизуемся:

psql -h postgresql-dev -p 5432 -U postgres -d postgres

Нас попросят ввести пароль, в моём случае это pgpass. После его ввода и успешной авторизации выполним:

SELECT * FROM client_data;

id | bank_bic | bank_name | correlation_id | fio | last_update
---+-------------------+-----------------+---------------------+-----------+-----------------------------------
1 | 1179896397 | null | 1 | Рауль 1| 2023-08-06 16:08:16.156
2 | -1471064429 | null | 2 | Рауль 2| 2023-08-06 16:08:16.221
3 | -604766198 | null | 3 | Рауль 3| 2023-08-06 16:08:16.228
4 | -771359675 | null | 4 | Рауль 4| 2023-08-06 16:08:16.232
5 | 1082682564 | null | 5 | Рауль 5| 2023-08-06 16:08:16.237
--------------------------------------------------------------------------------------------------------------------

Заключение

Если данные будут успешно обработаны, то записи будут получены при выполнении SQL запроса. Таким образом мы видим, что ивентная модель работает и успешно обрабатывает данные в асинхронном режиме, чем обеспечивает Evential Consistency.
Исходный код получателя и Helm Chart доступен на GitHub.

Комментарии (1)


  1. mikegordan
    21.09.2023 17:30

    Я всеравно не понимаю зачем вся эта прослойка если не решает основной кэйс: автоматическое масштабирование микросервисов при увеличении партиций