Данная статья является продолжением статьи - Ивентная модель данных с использованием Kafka и Kafka Connect: Построение гибкой и распределенной архитектуры.
Добро пожаловать во вторую часть статьи о построении гибкой и распределенной архитектуры с использованием Apache Kafka и Kafka Connect! В первой части мы ознакомились с ивентной моделью данных, разработали сервис отправителя и настроили интеграцию с Kafka, чтобы асинхронно отправлять сообщения. Теперь настало время рассмотреть вторую часть этого увлекательного проекта.
В этой части статьи мы сфокусируемся на реализации получателя на Java, который будет получать Avro-схемы из Schema Registry и читать сообщения из Kafka, отправленные сервисом отправителя через Kafka Connect. Получатель будет обрабатывать сообщения из топика, обогащать их своими данными и сохранять результаты в своей базе данных.
Цели статьи:
Разработка Java-приложения в качестве получателя данных из Kafka.
Изучение работы с Avro-схемами и Schema Registry для обеспечения совместимости сообщений.
Обработка сообщений из Kafka с использованием асинхронного подхода.
Интеграция с PostgreSQL для сохранения обработанных данных.
Подготовка Helm Chart для развертывания отправителя и получателя в Kubernetes.
Конфигурирование Kubernetes для обеспечения масштабируемости и отказоустойчивости архитектуры.
Подготовка Java-приложения в качестве получателя
Для начала подготовим наш POM файл
pom.xml
<dependencies>
<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>32.1.1-jre</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>7.4.0</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-client</artifactId>
<version>7.4.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
Здесь мы добавим библиотеки необходимые для работы со Schema Regestry, PostgreSQL, Kafka и проч. Чтобы успешно выгрузить библиотеки io.confluent необходимо подключиться к их репозиторию
<repositories>
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>
Теперь напишем yaml конфигурацию для проекта:
# Порт сервера
server:
port: 8081
# Настройки Spring
spring:
main:
allow-bean-definition-overriding: true
application:
name: kafka-example-consumer
jpa:
# Диалект базы данных
database-platform: org.hibernate.dialect.PostgreSQLDialect
generate-ddl: false
hibernate:
ddl-auto: update
show-sql: true
properties:
hibernate:
generate_statistics: false
datasource:
# URL для подключения к базе данных PostgreSQL
url: jdbc:postgresql://localhost:5432/public
username: postgres
password: postgres
kafka:
schema:
registry:
# Пути до хранилищ схем если у нас их несколько (дополнительная конфигурация)
urls: http://schema-registry-cp-schema-registry:8081
bootstrap-servers: localhost:29092
listener:
# Как подтверждаем получение сообщений
ack-mode: record
producer:
client-id: ${spring.application.name}
# Сериализатор ключа
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# Сериализатор значения в DLT очередь
value-serializer: ru.marmarks.consumer.config.DltMessageSerializer
retries: 3
consumer:
group-id: ${spring.application.name}
autoOffsetReset: earliest
# Сериализаторы для ключа и значения с обработкой ошибок
keyDeserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
valueDeserializer: org.springframework.kafka.support.serializer.ErrorHandlingDeserializer
properties:
# Тип значения в JSON по умолчанию
spring.json.value.default.type: org.apache.avro.generic.GenericRecord
spring.deserializer.key.delegate.class: org.apache.kafka.common.serialization.StringDeserializer
# Сериализатор значения
spring.deserializer.value.delegate.class: io.confluent.kafka.serializers.KafkaAvroDeserializer
specific:
avro:
reader: true
# Хранилище схем по умолчанию
schema:
registry:
url: http://schema-registry-cp-schema-registry:8081
# Настройки Kafka
kafka:
topics:
# Топик из которого будем получать записи
personal-data: kafka-connect-personal_data
services:
# Для определения того из каких топиков мы поддерживаем чтение
personal-data: kafka-connect-personal_data
Подключение к Schema Registry и Kafka брокеру
Теперь перейдём к конфигурированию Kafka. Комментарии приведены в коде. Здесь мы конфигурируем подключение к Schema Regestry, слушателя и отправку сообщений, которые не вышло обработать в dlt очередь
KafkaConfiguration.java
@Configuration
@EnableKafka
@RequiredArgsConstructor
@EnableConfigurationProperties(SchemaRegistryProperties.class)
public class KafkaConfiguration {
private static final int DEFAULT_CACHE_CAPACITY = 200;
private static final String DLT_TOPIC_SUFFIX = ".dlt";
private final ConsumerFactory<String, GenericRecord> consumerFactory;
private final ProducerFactory<Object, Object> producerFactory;
@Bean
public ConcurrentKafkaListenerContainerFactory<String, GenericRecord> objectKafkaListenerContainerFactory(
DefaultErrorHandler errorHandler
) {
ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setCommonErrorHandler(errorHandler);
return factory;
}
/**
* Клиент для кеширования схем
*/
@Bean
public CachedSchemaRegistryClient cachedSchemaRegistryClient(
SchemaRegistryProperties schemaRegistryProperties) {
// Сервис для коммуникацией со Schema Registry
RestService restService = new RestService(schemaRegistryProperties.urls());
//Также передаём размер кэша схем
return new CachedSchemaRegistryClient(restService, DEFAULT_CACHE_CAPACITY);
}
@Bean
public KafkaTemplate<Object, Object> kafkaTemplate() {
return new KafkaTemplate<>(producerFactory);
}
/**
* Публикатор в dead-letter topic.
*/
@Bean
public DeadLetterPublishingRecoverer publisher(KafkaTemplate<Object, Object> bytesTemplate) {
// Определяем логику выбора партиции для отправки сообщения в DLT.
// В данном случае, создаём новый объект TopicPartition, используя имя
//топика (consumerRecord.topic()) и добавляя суффикс DLT_TOPIC_SUFFIX,
// а также номер партиции (consumerRecord.partition()).
// Следовательно в DLT топике должно быть столько партиций
//сколько и в топике откуда читаем
return new DeadLetterPublishingRecoverer(bytesTemplate, (consumerRecord, exception) ->
new TopicPartition(consumerRecord.topic() + DLT_TOPIC_SUFFIX, consumerRecord.partition()));
}
/**
* Обработчик исключений при получении сообщений из kafka по умолчанию.
*/
@Bean
public DefaultErrorHandler errorHandler(
DeadLetterPublishingRecoverer deadLetterPublishingRecoverer) {
final var handler = new DefaultErrorHandler(deadLetterPublishingRecoverer);
// Обрабатываем любые исключения и отправляем в DLT
handler.addNotRetryableExceptions(Exception.class);
return handler;
}
}
Посмотрим на сущность, которую будем обрабатывать:
public class ClientData {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String bankBic;
private String bankName;
private String fio;
private Long correlationId;
private LocalDateTime lastUpdate;
}
Добавили новые поля, относительно отправляемой сущности - fio, correlationId. Предположим, что мы заполняем ФИО, обращаясь в какой-то сторонний сервис.
Обработка сообщений из топика
Теперь перейдём к тому, как будем обрабатывать приходящее сообщение
KafkaConsumerListeners.java
public class KafkaConsumerListeners {
// Получим сервисы, которые смогут обрабатывать приходящие сообщения
private final List<ServiceResolver> serviceResolverList;
// Определяем топики из которых будем читать. Их может быть несколько.
//Также определим конфигурацию листнера
@KafkaListener(topics = "#{'${kafka.topics.personal-data}'.split(',')}", containerFactory = "objectKafkaListenerContainerFactory")
// Принимаем абстрактный класс GenericRecord.
//Мы сможем обработать только то сообщение, для которого определена
//схема в Schema Regestry
void readOrganizationConnectorMessages(ConsumerRecord<String, GenericRecord> message) {
readMessage(message);
}
private void readMessage(ConsumerRecord<String, GenericRecord> message) {
String topic = message.topic();
long offset = message.offset();
log.info("Сообщение из топика: {} offset: {}", topic, offset);
// Получаем само сообщение
GenericRecord value = message.value();
if (value == null) {
log.info("Пустое сообщение из топика: {} offset: {}", topic, offset);
return;
}
// Находим сервис, который сможет обработать сообщение
ServiceResolver resolver = serviceResolverList.stream()
.filter(it -> it.isSupported(topic))
.findFirst()
.orElseThrow(() -> new RuntimeException("Сервис не найден для топика: " + topic));
// Передаём сообщение на обработку
resolver.process(value);
log.info("Сообщение обработано: топик {} offset {}", topic, offset);
}
}
Комментарии по коду приведены непосредственно в коде. Реализация заточена на то, что мы сможем обрабатывать несколько топиков с различными сообщения. Схема каждого сообщения должна быть приведена в Schema Regestry
Работа с Avro-схемами
Посмотрим на обработку сообщения в сервисе
Обработка сообщения
public void process(GenericRecord value) {
Long id = Long.valueOf(String.valueOf(value.get("id")));
String bankBic = String.valueOf(value.get("bank_bic"));
String bankName = String.valueOf(value.get("bank_name"));
String lastUpdate = String.valueOf(value.get("last_update"));
long millis = Long.parseLong(lastUpdate);
Instant instant = Instant.ofEpochMilli(millis);
LocalDateTime lastUpdateTime = instant.atZone(ZoneId.systemDefault()).toLocalDateTime();
// Подготовим новую сущность на основе приходящих данных
ClientData clientData = new ClientData();
clientData.setBankBic(bankBic);
clientData.setFio("Рауль " + id);
clientData.setBankName(bankName);
clientData.setCorrelationId(id);
clientData.setLastUpdate(lastUpdateTime);
ClientData save = clientDataRepository.save(clientData);
log.info("Данные клиента сохранены: {}", save);
}
В данном случае мы примитивным способом вытаскиваем все поля из сообщения и на основе них создаём новую сущность, которую модифицируем новыми данными, которые мы можем получить обратившись куда-либо ещё или иным способом и записываем в базу
Полный код получателя доступен на GitHub.
Helm Chart
Теперь перейдём к Helm чарту, который будет разворачивать нашего отправителя и получателя.
Подготовим файл с информацией о чарте:
# Версия API Helm Chart
apiVersion: v2
# Имя чарта
name: marmarks-chart
# Версия чарта
version: 1.0.0
# Описание
description: Some project
# Тип
type: application
sources:
- https://github.com/marmarks
maintainers:
- name: Maksim Krylov
email: kryloumaksim@gmail.com
url: https://habr.com/ru/users/Marmaksmark/
# Версия приложения
appVersion: 1.0.0
Конфигурации и шаблоны
Следующим шагом подготовим файл values.yaml, который предоставляет конфигурации для чарта. Помимо того, что настройки будут определены в этом файле мы сможем также их переопределять используя --set
при запуске чарта
producer:
# Конфигурация деплоймента
deployment:
# Имя деплоймента
name: "producer"
# Число реплик
replicas: 1
# Настройки контейнера
container:
name: "producer-container"
resources: { }
# Настройки сервиса
service:
name: "producer-service"
port: 8080
# Информация об образе сервиса
image:
name: marmarks/producer
tag: "0.2"
# Загрузим образ, если локально его нет
pullPolicy: IfNotPresent
consumer:
deployment:
name: "consumer"
replicas: 1
container:
name: "consumer-container"
resources: { }
service:
name: "consumer-service"
port: 80
# тип сервиса: ClusterIP, NodePort, LoadBalancer
type: LoadBalancer
image:
name: marmarks/consumer
tag: "0.5"
pullPolicy: IfNotPresent
Также мы определим файл _helpers.tpl содержащий вспомогательные шаблоны, такие как:
_helpers.tpl
Имена меток для связей между объектами
{{- define "marmarks-chart.selectorLabels" -}}
app.kubernetes.io/name: {{ .Chart.Name }}
app.kubernetes.io/instance: {{ .Release.Name }}
{{- end -}}
Имя и версия чарта
{{- define "marmarks-chart.chart" -}}
{{- printf "%s-%s" .Chart.Name .Chart.Version | replace "+" "_" | trunc 60 | trimSuffix "-" -}}
{{- end -}}
Имена меток в манифестах
{{- define "marmarks-chart.labels" -}}
helm.sh/chart: {{ include "marmarks-chart.chart" . }}
{{ include "marmarks-chart.selectorLabels" . }}
{{- if .Chart.AppVersion }}
app.kubernetes.io/version: {{ .Chart.AppVersion | quote }}
{{- end }}
app.kubernetes.io/managed-by: {{ .Release.Service }}
{{- end -}}
Функция для вычисления хэш суммы. Для обновления чарта при изменении секретов или конфигураций
{{- define "marmarks-chart.propertiesHash" -}}
{{- $producerSecrets := include (print $.Template.BasePath "/producer-secrets.yaml") . | sha256sum -}}
{{- $consumerSecrets := include (print $.Template.BasePath "/consumer-secrets.yaml") . | sha256sum -}}
{{- $producerConfig := include (print $.Template.BasePath "/producer-config.yaml") . | sha256sum -}}
{{- $consumerConfig := include (print $.Template.BasePath "/consumer-config.yaml") . | sha256sum -}}
{{ print $producerSecrets $producerConfig $consumerSecrets $consumerConfig | sha256sum }}
{{- end -}}
Имена компонентов отправителя
{{- define "marmarks-chart.producer.defaultName" -}}
{{- printf "producer-%s" .Release.Name -}}
{{- end -}}
{{- define "marmarks-chart.producer.deployment.name" -}}
{{- default (include "marmarks-chart.producer.defaultName" .) .Values.producer.deployment.name | trunc 63 | trimSuffix "-" -}}
{{- end -}}
{{- define "marmarks-chart.producer.container.name" -}}
{{- default (include "marmarks-chart.producer.defaultName" .) .Values.producer.container.name | trunc 63 | trimSuffix "-" -}}
{{- end -}}
{{- define "marmarks-chart.producer.service.name" -}}
{{- default (include "marmarks-chart.producer.defaultName" .) .Values.producer.service.name | trunc 63 | trimSuffix "-" -}}
{{- end -}}
Имена компонентов получателя
{{- define "marmarks-chart.consumer.defaultName" -}}
{{- printf "consumer-%s" .Release.Name -}}
{{- end -}}
{{- define "marmarks-chart.consumer.deployment.name" -}}
{{- default (include "marmarks-chart.consumer.defaultName" .) .Values.consumer.deployment.name | trunc 63 | trimSuffix "-" -}}
{{- end -}}
{{- define "marmarks-chart.consumer.container.name" -}}
{{- default (include "marmarks-chart.consumer.defaultName" .) .Values.consumer.container.name | trunc 63 | trimSuffix "-" -}}
{{- end -}}
{{- define "marmarks-chart.consumer.service.name" -}}
{{- default (include "marmarks-chart.consumer.defaultName" .) .Values.consumer.service.name | trunc 63 | trimSuffix "-" -}}
{{- end -}}
Имена прочих компонентов компонентов (секретов, конфигов)
{{- define "marmarks-chart.producerSecrets.defaultName" -}}
{{- printf "producer-secrets-%s" .Release.Name -}}
{{- end -}}
{{- define "marmarks-chart.consumerSecrets.defaultName" -}}
{{- printf "consumer-secrets-%s" .Release.Name -}}
{{- end -}}
{{- define "marmarks-chart.producerConfig.defaultName" -}}
{{- printf "producer-config-%s" .Release.Name -}}
{{- end -}}
{{- define "marmarks-chart.consumerConfig.defaultName" -}}
{{- printf "consumer-config-%s" .Release.Name -}}
{{- end -}}
Получатель
Теперь, когда конфигурации и шаблоны описаны мы можем описать конфигурационный файл consumer-config.yaml для отправителя:
apiVersion: v1
kind: ConfigMap
metadata:
name: {{ include "marmarks-chart.consumerConfig.defaultName" . }}
labels:
{{- include "marmarks-chart.labels" . | nindent 4 }}
data:
SPRING_PROFILES_ACTIVE: "dev"
SERVER_PORT: "8080"
SPRING_KAFKA_BOOTSTRAP_SERVERS: "kafka:9092"
SPRING_DATASOURCE_URL: "jdbc:postgresql://postgresql-dev:5432/postgres"
Здесь мы определили на каком порту будем запускать наше приложение, выбранный профиль, url брокера и url до базы данных. Логин-пароль для аутентификации будем хранить в секретах consumer-secrets.yaml:
# Манифест для создания секретов
# Версия API
apiVersion: v1
# Тип манифеста
kind: Secret
# Метаданные манифеста
metadata:
# Имя манифеста
name: {{ include "marmarks-chart.consumerSecrets.defaultName" . }}
labels:
# Метки манифеста
{{- include "marmarks-chart.labels" . | nindent 4 }}
# Тип секрета: неструктурированные данные. Храни просто пару "ключ-значение"
# Без автоматического обновления, управления, ротации и т.д.
type: Opaque
stringData:
SPRING_DATASOURCE_USERNAME: "postgres"
SPRING_DATASOURCE_PASSWORD: "pgpass"
Подробнее про секреты и их типы в статье - Безопасное хранение secrets в Kubernetes
Осталось описать лишь дейломент и сервис получателя consumer.yaml. Исчерпывающие комментарии оставлены в коде:
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ include "marmarks-chart.consumer.deployment.name" . }}
labels:
{{- include "marmarks-chart.labels" . | nindent 4 }}
tier: consumer
{{/* Спецификация развёртывания*/}}
spec:
{{/* Число реплик*/}}
replicas: {{ .Values.consumer.deployment.replicas }}
selector:
{{/* Метки подов*/}}
matchLabels:
{{- include "marmarks-chart.selectorLabels" . | nindent 6 }}
tier: consumer
{{/* Стратегия обновления развёртывания*/}}
strategy:
{{/* Пошаговое обновление*/}}
rollingUpdate:
{{/* Максимальное количество реплик для обновления*/}}
maxSurge: 25%
{{/* Максимальное число реплик недоступных во время обновления*/}}
maxUnavailable: 25%
{{/* Тип стратегии развёртывания*/}}
type: RollingUpdate
{{/* Шаблон создания подов*/}}
template:
metadata:
{{/* Аннотации*/}}
annotations:
{{/* Проверка чек суммы для обновления секретов и конфигов*/}}
checksum/config: {{ include "marmarks-chart.propertiesHash" . }}
labels:
{{- include "marmarks-chart.selectorLabels" . | nindent 8 }}
tier: consumer
{{/* Спецификации пода*/}}
spec:
{{/* Контейнер, который будет запущен в поде*/}}
containers:
- name: {{ include "marmarks-chart.consumer.container.name" . }}
image: "{{ .Values.consumer.image.name }}:{{ .Values.consumer.image.tag }}"
imagePullPolicy: {{ .Values.consumer.image.pullPolicy }}
{{/* Переменные окружения из конфиг мапы*/}}
envFrom:
- configMapRef:
name: {{ include "marmarks-chart.consumerConfig.defaultName" . }}
{{/* Переменные окружения секреты*/}}
- secretRef:
name: {{ include "marmarks-chart.producerSecrets.defaultName" . }}
readinessProbe:
{{/* Проверка доступности пода*/}}
httpGet:
path: /actuator/health
port: 8080
initialDelaySeconds: 5
periodSeconds: 3
ports:
{{/* Порты пода*/}}
- containerPort: 8080
{{/* Протокол*/}}
protocol: TCP
resources:
{{/* Ресурсы выделенные для контейнера*/}}
{{- toYaml .Values.consumer.container.resources | nindent 12 }}
---
apiVersion: v1
kind: Service
metadata:
name: {{ include "marmarks-chart.consumer.service.name" . }}
labels:
{{- include "marmarks-chart.labels" . | nindent 4 }}
tier: consumer
spec:
type: {{ .Values.consumer.service.type }}
ports:
{{/* Порт службы*/}}
- port: {{ .Values.consumer.service.port }}
protocol: TCP
{{/* Порт контейнера, на который будет направлен трафик*/}}
targetPort: 8080
{{/* Имя порта*/}}
name: http
{{/* Селектор для выбора подов, которые будут обслужены сервисом*/}}
selector:
{{- include "marmarks-chart.selectorLabels" . | nindent 4 }}
tier: consumer
Мы определили стратегию развертывания, health checks и прочее. Наш получатель уже полностью готов к приёму трафика из брокера, а брокер уже готов его нам поставлять. Однако мы сделаем аналогичную конфигурацию для отправителя
Отправитель
producer-config.yaml
apiVersion: v1
kind: ConfigMap
metadata:
name: {{ include "marmarks-chart.producerConfig.defaultName" . }}
labels:
{{- include "marmarks-chart.labels" . | nindent 4 }}
data:
SERVER_PORT: "8080"
SPRING_PROFILES_ACTIVE: "dev"
SPRING_DATASOURCE_URL: "jdbc:postgresql://postgresql-dev:5432/postgres"
producer-secrets.yaml
# Манифест для создания секретов
# Версия API
apiVersion: v1
# Тип манифеста
kind: Secret
# Метаданные манифеста
metadata:
# Имя манифеста
name: {{ include "marmarks-chart.producerSecrets.defaultName" . }}
labels:
# Метки манифеста
{{- include "marmarks-chart.labels" . | nindent 4 }}
# Тип секрета: неструктурированные данные. Храни просто пару
"ключ-значение"
# Без автоматического обновления, управления, ротации и т.д.
type: Opaque
stringData:
SPRING_DATASOURCE_USERNAME: "postgres"
SPRING_DATASOURCE_PASSWORD: "pgpass"
producer.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ include "marmarks-chart.producer.deployment.name" . }}
labels:
{{- include "marmarks-chart.labels" . | nindent 4 }}
tier: producer
spec:
replicas: {{ .Values.producer.deployment.replicas }}
selector:
matchLabels:
{{- include "marmarks-chart.selectorLabels" . | nindent 6 }}
tier: producer
strategy:
rollingUpdate:
maxSurge: 25%
maxUnavailable: 25%
type: RollingUpdate
template:
metadata:
labels:
{{- include "marmarks-chart.selectorLabels" . | nindent 8 }}
tier: producer
spec:
containers:
- name: {{ include "marmarks-chart.producer.container.name" . }}
image: "{{ .Values.producer.image.name }}:{{ .Values.producer.image.tag }}"
imagePullPolicy: {{ .Values.producer.image.pullPolicy }}
envFrom:
- configMapRef:
name: {{ include "marmarks-chart.producerConfig.defaultName" . }}
- secretRef:
name: {{ include "marmarks-chart.producerSecrets.defaultName" . }}
ports:
- containerPort: 8080
protocol: TCP
readinessProbe:
httpGet:
path: /actuator/health
port: 8080
initialDelaySeconds: 5
periodSeconds: 3
resources:
{{- toYaml .Values.producer.container.resources | nindent 12 }}
---
apiVersion: v1
kind: Service
metadata:
name: {{ include "marmarks-chart.producer.service.name" . }}
labels:
{{- include "marmarks-chart.labels" . | nindent 4 }}
tier: producer
spec:
ports:
- port: {{ .Values.producer.service.port }}
protocol: TCP
targetPort: 8080
name: http
selector:
{{- include "marmarks-chart.selectorLabels" . | nindent 4 }}
tier: producer
Запуск и тестирование архитектуры
Необходимо иметь полностью развёрнутую среду из первой части статьи.
Теперь когда Helm Chart готов к запуску, находясь в директории чарта, выполним команду:
helm install <имя чарта> .
Наш чарт успешно запустится, если логи не скажут иного, и будет доступен. Зайдя в логи пода отправителя мы увидим, что он записал несколько новых сообщений в свою базу. После этого, зайдя в логи получателя, мы увидим, что он прочитал и обработал все сообщения, что были в топике раньше и появились только что. Это происходит весьма быстро.
Проверка работоспособности получателя и отправител
Если в ходе обработки сообщений произошла ошибка, то сообщения, чья обработка была безуспешна отправятся в DLT топик. Для того, чтобы убедиться в успешной записи сообщений, мы зайдём в под PostgreSQL и выполним несколько команд. Для начала авторизуемся:
psql -h postgresql-dev -p 5432 -U postgres -d postgres
Нас попросят ввести пароль, в моём случае это pgpass. После его ввода и успешной авторизации выполним:
SELECT * FROM client_data;
id | bank_bic | bank_name | correlation_id | fio | last_update
---+-------------------+-----------------+---------------------+-----------+-----------------------------------
1 | 1179896397 | null | 1 | Рауль 1| 2023-08-06 16:08:16.156
2 | -1471064429 | null | 2 | Рауль 2| 2023-08-06 16:08:16.221
3 | -604766198 | null | 3 | Рауль 3| 2023-08-06 16:08:16.228
4 | -771359675 | null | 4 | Рауль 4| 2023-08-06 16:08:16.232
5 | 1082682564 | null | 5 | Рауль 5| 2023-08-06 16:08:16.237
--------------------------------------------------------------------------------------------------------------------
Заключение
Если данные будут успешно обработаны, то записи будут получены при выполнении SQL запроса. Таким образом мы видим, что ивентная модель работает и успешно обрабатывает данные в асинхронном режиме, чем обеспечивает Evential Consistency.
Исходный код получателя и Helm Chart доступен на GitHub.
mikegordan
Я всеравно не понимаю зачем вся эта прослойка если не решает основной кэйс: автоматическое масштабирование микросервисов при увеличении партиций