Добрый день, Хаброжители! Это моя первая статья на Хабре, любые замечания, предложения, пожелания приветствуются!
Предистория
На предыдущем месте работы (в одной Британской компанни) существовала распиленная MySql DB под три микросевиса, назовем их условно Cusomer, Person, Account (у каждого микросервиса своя DB соответственно). К каждой DB был подвязан свой Address (адрес у Customer, у Person, и под Account тоже мог быть подвязан Address). Раньше когда это была одна большая база данных, поиск по Address не составлял труда, но когда базу распилили, это сделать стало сложнее. Было принято решение написать микросервис, который бы подключался по CDC к этим трем DB (технологии: MySql, Debezium, Spring Cloud Streams) и складывал бы только нужные данные для поиска (по Address, по имени, и т.д.) в одну базу данных (MongoDB). В ходе решения этой задачи, я получил интересный опыт, которым и хотел бы поделиться.
Debezium + MySql
Debezium - написанная на Java платформа, которая подключается к бинарному логу MySql (куда он записывает изменения схемы, данных и т.д.), считывает непрочитанные /новые логи и отправляет их в Kafka topic. Дальше мы можем подключиться к этому Kafka топику и обрабатывать сообщения.
Чтобы разобраться с Debezium мне очень помогло видео Виктора Гамова и официальная документация.
Для того, что бы запустить все локально, начнем с docker-compose.yml файла:
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.5.1
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-server:5.5.1
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "29092:29092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
CONFLUENT_METRICS_ENABLE: 'false'
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
connect-debezium:
image: debezium/connect:1.2
container_name: connect-debezium
depends_on:
- broker
- person-database
ports:
- 8083:8083
environment:
BOOTSTRAP_SERVERS: broker:9092
GROUP_ID: connect-debezium
CONFIG_STORAGE_TOPIC: docker-connect-debezium-configs
OFFSET_STORAGE_TOPIC: docker-connect-debezium-offsets
STATUS_STORAGE_TOPIC: docker-connect-debezium-status
KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
volumes:
- ${PWD}/scripts:/scripts
person-database:
container_name: person-database
image: mysql
hostname: person-database
ports:
- 3306:3306
environment:
MYSQL_ROOT_PASSWORD: admin
MYSQL_USER: dev
MYSQL_PASSWORD: password
MYSQL_DATABASE: person
# volumes:
# - ./mysqlconf/mysqlconf.cnf:/etc/mysql/conf.d/mysql.cnf
# - ./dump:/docker-entrypoint-initdb.d
mongodb:
container_name: search-database
image: mongo
hostname: search-database
ports:
- 27017-27019:27017-27019
environment:
MONGO_INITDB_DATABASE: search
MONGO_INITDB_ROOT_USERNAME: dev
MONGO_INITDB_ROOT_PASSWORD: password
Здесь мы поднимаем 4 главных контейнера:
zookeeper — необходим для менеджмента kafka брокера.
broker — это наша kafka с конфигами поключения zookeeper, брокера, и т. д.
connect-debezium — standalong debezium, готовый к подключению к kafka.
Стоит отметить что в официальной документации можно встретить этот docker-compose файл с конфигурациями key/value конверторов в Avro формате ( и еще один контейнер schem-registry для поддержки этого формата). Но в моей задаче обычного string/json конвертера было больше чем достаточно.
KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverterMySql контейнер.
MongoDB контейнер (для части 2 туториала).
Запускаем докер: docker-compose -up
После того как все контейнеры запустились, нужно включить в Msyql бин-лог и дать права debezium подключаться к нему.
Для этого идем в mysql контейнер docker exec -it <mysql-container-id> mysql -u root -padmin
и выполняем nano /etc/mysql/my.cnf
. Т.к. в первоначально текстового редактора в контейнере нет, его можно установить apt-get update && apt-get install nano
.
Добавляем после [mysql]
следующее:server_id=42
log_bin=mysql_bin
binlog_format=row
binlog_row_image=full
expire_logs_days=10
!Важно, после сохранения нужно ре-стартануть контейнер!
Также необходимо дать права debezium:
CREATE USER 'debezium'@'%' IDENTIFIED BY 'dbz';
GRANT ALL PRIVILEGES ON *.* TO 'debezium'@'%';
ALTER USER 'debezium'@'%' IDENTIFIED WITH mysql_native_password BY 'dbz';
Когда наша БД настроена, настало время подружить ее с коннектором. Для этого нужно отправить метод POST с конфигами коннектора в теле запроса.
curl -i -X POST -H "Content-Type:application/json" \
http://localhost:8083/connectors
-d '{
"name": "data-connector",
"config": {
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"tasks.max": "1",
"database.hostname": "person-database",
"database.port": "3306",
"database.user": "debezium",
"database.allowPublicKeyRetrieval":"true",
"database.password": "dbz",
"database.server.id": "42",
"database.server.name": "mysql_server",
"database.include.list": "person",
"database.history.kafka.bootstrap.servers": "broker:9092",
"database.history.kafka.topic": "schema-changes.person",
"include.schema.changes":"false",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.add.headers": "db",
"transforms.unwrap.add.fields": "op,table",
"transforms.unwrap.drop.tombstones": "true",
"transforms.unwrap.delete.handling.mode": "rewrite"
}
}'
Для этого мне удобнее всего использовать Postman.
Здесь используется Kafka-Сonnect, возможно трансформировать сообщение на этапе генерации. Для этого используется «transform.*». Об этом можно детально почитать здесь и здесь. Например важно удалять tomb-stone сообщения.
Т.к. мне не нужно было читать логи об изменениях схемы, я их выключил:"include.schema.changes":"false"
(по дефолту «true»)
И так, оправляем Post на http://localhost:8083/connectors.
Дополнительно проверяем наш коннектор:curl -i -X GET http://localhost:8083/connectors/data-connector
curl -i -X GET http://localhost:8083/connectors/data-connector/status
Ошибок нет, видим статус RUNNING, значит все норм.
Если все-таки есть ошибки, из практики, скорее всего неправильно указан пароль\права debezium или просто не рестартанули контейнер mysql. Вообще прилетает сразу весь список ошибок, можно разобраться.
Итак наш CDC готова, давайте посмотрим топики в kafka:docker exec -i broker /usr/bin/kafka-topics --list --bootstrap-server broker:29092
Коннектор создал служебные топики в kafka, но это еще не топики для сообщений об изменениях.
Зайдем в наш mysql контейнер и попробуем создать простую таблицу:
CREATE TABLE IF NOT EXISTS tasks (
task_id INT AUTO_INCREMENT PRIMARY KEY,
title VARCHAR(255) NOT NULL,
start_date DATE,
status TINYINT NOT NULL,
priority TINYINT NOT NULL,
description TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
) ENGINE=INNODB;
Опять смотрим топики в kafka:docker exec -i broker /usr/bin/kafka-topics --list --bootstrap-server broker:29092
Дело в том, что топики создаются в kafka с «lazy approach» и т.к. мы отключили логи изменения схемы БД, нам ничего не прилетело, топик не создался.
Теперь давайте добавим данные в таблицу:
INSERT INTO tasks (title, start_date, status, priority, description)
VALUES ("title1", "2021-05-28", "1", "1", "description1");
Опять смотрим топики в kafka:
О чудо, у нас появился новый топик mysql_server.person.tasks
!
Тут важно понимать, что коннектор создает в Kafka отдельный топик под каждую таблицу - one topic per table.
Подключаем консольного консьюмера
docker exec -i broker /usr/bin/kafka-console-consumer --bootstrap-server broker:29092 --topic mysql_server.person.tasks --from-beginning
и видим наш лог:
Если эта статья окажется полезной и будет одобрена модератором, в следующей статье я расскажу о новой модной штуке - как создать Spring Cloud Streams приложение (с Spring Function Consumer, без deprecated @StreamListener).
Комментарии (9)
korsetlr473
27.09.2021 14:45+1а как это ты подключился к
JaS4083 Автор
27.09.2021 14:48а как это ты подключился к подключился к кафка через kafka коннект систему, но при этом у тебя в docker compose не установлен сервис kafka connect? "cp-server-connect" ?
В туториале (https://debezium.io/documentation/reference/tutorial.html) нашел такое:Debezium is built on top of Apache Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, from where your application consumes them.Т.е. Дебизиум использует под капотом kafka-connect.
И так мы конфижим kafka-connect что бы он подключался к kafka брокеру.
environment: BOOTSTRAP_SERVERS: broker:9092 GROUP_ID: connect-debezium CONFIG_STORAGE_TOPIC: docker-connect-debezium-configs OFFSET_STORAGE_TOPIC: docker-connect-debezium-offsets STATUS_STORAGE_TOPIC: docker-connect-debezium-status KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
aademchenko
27.09.2021 15:27+1А что это за бин-лог в MySql DB?
JaS4083 Автор
27.09.2021 22:16+1По сути это лог-файлы в сервере MySql, где записываются ивенты (изменение схемы, добавлени/изменение/удаление данных и т.д.) Это логгирование можно "включить" в настойках сервера (по дефолту оно выключенно).
Более детально об этом можно почитать здесь - https://dev.mysql.com/doc/refman/8.0/en/binary-log.html
ololx
27.09.2021 16:35+1Спасибо за статью, очень полезная. Года 2 назад уже рассматривал debezium, с СУБД mysql все работало хорошо по мануалам, но с postgresql не получилось никак реализовать CDC. Поэтому писал реализацию CDC уже сам под каждый конкретный сервис - у сервиса (master data) пишется продюсер для кафки, а у зависимых (slave) сервисов уже консьюмеры.
Но благодаря Вашей статье решил попробовать debezium снова, тем более за два года могло многое измениться)
Недавно наткнулся еще и на embedded реализацию debezium и начал изучать этот вариант. Если интересно начало проекта тут - https://github.com/innopolis-university-java-team/change-data-capture-instances.
JaS4083 Автор
27.09.2021 19:32Интересно, тоже удалось тоже подключиться к логу Postgres, или просто в сервис, который работал с Postgres добавили кафка продьюсера ?
Embedded Debezium пробовал, работает со Spring Boot довольно неплохо. Но в реализацию его не взяли потому что, у нас было 3 микросервиса которые читали из 3х БД (и складывали в одну БД), и так бы получилось, что нужно аж 3 embedded debezium разворачивать. А если делать stand along server, то в одном debezium сервере можно сразу законфижить 3 разных коннектора, и все рабоатет. Плюс модный Spring Cloud Stream ....
(хочу о нем еще написать).ololx
27.09.2021 19:39+1Все куда проще - именно продюсера на spring-cloud-stream который вызывался непосредственно после успешной манипуляции с сущностью; написал ДТО по образу того, что есть в debezium (поля T before, T after + метод, который по соотношению значений полей определяет тип события - delete, update и create). Что хорошего в такой реализации - можно по необходимости Google Protobuff применить.
Получается при возникновении события в сервисе (например создании сущности) создается инстанс ДТО, с before = null, after = объект; ДТО поститься в кафку; ну а далее уже консьюмеры отрабатывают. На тот момент это было быстрее, чем разбираться с debezium до конца.
JaS4083 Автор
27.09.2021 21:26Звучит как рабочий подход. Тем более скорее всего 2 года назад Debezium был еще "сырым".
Слушал выступление его создателя Gunar Morlin, он говорил что главный use case прежде всего если есть БД с клиентом написаном на легаси (диком легаси, куда и лезть не хочется), - тогда CDC точно для этого.
Ну если такого нет, то наверное можно написать самому, или все-таки прикрутить и наконфижить Debezium (что оказалось тоже не сложно).
С Protobuf не работал, не могу ничего сказать.
mack
Полезно, спасибо.