Добрый день, Хаброжители! Это моя первая статья на Хабре, любые замечания, предложения, пожелания приветствуются!

Предистория

На предыдущем месте работы (в одной Британской компанни) существовала распиленная MySql DB под три микросевиса, назовем их условно Cusomer, Person, Account (у каждого микросервиса своя DB соответственно). К каждой DB был подвязан свой Address (адрес у Customer, у Person, и под Account тоже мог быть подвязан Address). Раньше когда это была одна большая база данных, поиск по Address не составлял труда, но когда базу распилили, это сделать стало сложнее. Было принято решение написать микросервис, который бы подключался по CDC к этим трем DB (технологии: MySql, Debezium, Spring Cloud Streams) и складывал бы только нужные данные для поиска (по Address, по имени, и т.д.) в одну базу данных (MongoDB). В ходе решения этой задачи, я получил интересный опыт, которым и хотел бы поделиться.

Debezium + MySql

Debezium - написанная на Java платформа, которая подключается к бинарному логу MySql (куда он записывает изменения схемы, данных и т.д.), считывает непрочитанные /новые логи и отправляет их в Kafka topic. Дальше мы можем подключиться к этому Kafka топику и обрабатывать сообщения.

Чтобы разобраться с Debezium мне очень помогло видео Виктора Гамова и официальная документация.

Для того, что бы запустить все локально, начнем с docker-compose.yml файла:

version: '2'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.5.1
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-server:5.5.1
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "29092:29092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      CONFLUENT_METRICS_ENABLE: 'false'
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1

  connect-debezium:
    image: debezium/connect:1.2
    container_name: connect-debezium
    depends_on:
      - broker
      - person-database
    ports:
      - 8083:8083
    environment:
      BOOTSTRAP_SERVERS: broker:9092
      GROUP_ID: connect-debezium
      CONFIG_STORAGE_TOPIC: docker-connect-debezium-configs
      OFFSET_STORAGE_TOPIC: docker-connect-debezium-offsets
      STATUS_STORAGE_TOPIC: docker-connect-debezium-status
      KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
    volumes:
      - ${PWD}/scripts:/scripts
  person-database:
    container_name: person-database
    image: mysql
    hostname: person-database
    ports:
      - 3306:3306
    environment:
      MYSQL_ROOT_PASSWORD: admin
      MYSQL_USER: dev
      MYSQL_PASSWORD: password
      MYSQL_DATABASE: person
#    volumes:
#      - ./mysqlconf/mysqlconf.cnf:/etc/mysql/conf.d/mysql.cnf
#      - ./dump:/docker-entrypoint-initdb.d
  mongodb:
    container_name: search-database
    image: mongo
    hostname: search-database
    ports:
      - 27017-27019:27017-27019
    environment:
      MONGO_INITDB_DATABASE: search
      MONGO_INITDB_ROOT_USERNAME: dev
      MONGO_INITDB_ROOT_PASSWORD: password

Здесь мы поднимаем 4 главных контейнера:

  • zookeeper — необходим для менеджмента kafka брокера.

  • broker — это наша kafka с конфигами поключения zookeeper, брокера, и т. д.

  • connect-debezium — standalong debezium, готовый к подключению к kafka.
    Стоит отметить что в официальной документации можно встретить этот docker-compose файл с конфигурациями key/value конверторов в Avro формате ( и еще один контейнер schem-registry для поддержки этого формата). Но в моей задаче обычного string/json конвертера было больше чем достаточно.
    KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
    VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter

  • MySql контейнер.

  • MongoDB контейнер (для части 2 туториала).

Запускаем докер: docker-compose -up

После того как все контейнеры запустились, нужно включить в Msyql бин-лог и дать права debezium подключаться к нему.

Для этого идем в mysql контейнер
docker exec -it <mysql-container-id> mysql -u root -padmin
и выполняем nano /etc/mysql/my.cnf . Т.к. в первоначально текстового редактора в контейнере нет, его можно установить apt-get update && apt-get install nano.

Добавляем после [mysql] следующее:

server_id=42

log_bin=mysql_bin

binlog_format=row

binlog_row_image=full

expire_logs_days=10

!Важно, после сохранения нужно ре-стартануть контейнер!

Также необходимо дать права debezium:

CREATE USER 'debezium'@'%' IDENTIFIED BY 'dbz';
GRANT ALL PRIVILEGES ON *.* TO 'debezium'@'%';
ALTER USER 'debezium'@'%' IDENTIFIED WITH mysql_native_password BY 'dbz';

Когда наша БД настроена, настало время подружить ее с коннектором. Для этого нужно отправить метод POST с конфигами коннектора в теле запроса.

curl -i -X POST -H "Content-Type:application/json" \
http://localhost:8083/connectors
-d '{

  "name": "data-connector",  
  "config": {
    "connector.class": "io.debezium.connector.mysql.MySqlConnector",
    "tasks.max": "1",  
    "database.hostname": "person-database",  
    "database.port": "3306",
    "database.user": "debezium",
    "database.allowPublicKeyRetrieval":"true",
    "database.password": "dbz",
    "database.server.id": "42",  
    "database.server.name": "mysql_server",  
    "database.include.list": "person",  
    "database.history.kafka.bootstrap.servers": "broker:9092", 
    "database.history.kafka.topic": "schema-changes.person",
    "include.schema.changes":"false",
    "transforms": "unwrap",
    "transforms.unwrap.type": "io.debezium.transforms.ExtractNewRecordState",
    "transforms.unwrap.add.headers": "db",
    "transforms.unwrap.add.fields": "op,table",
    "transforms.unwrap.drop.tombstones": "true",
    "transforms.unwrap.delete.handling.mode": "rewrite"
  }
}'

Для этого мне удобнее всего использовать Postman.

Здесь используется Kafka-Сonnect, возможно трансформировать сообщение на этапе генерации. Для этого используется «transform.*». Об этом можно детально почитать здесь и здесь. Например важно удалять tomb-stone сообщения.

Т.к. мне не нужно было читать логи об изменениях схемы, я их выключил:
"include.schema.changes":"false"
(по дефолту «true»)

И так, оправляем Post на http://localhost:8083/connectors.

Дополнительно проверяем наш коннектор:
curl -i -X GET http://localhost:8083/connectors/data-connector
curl -i -X GET http://localhost:8083/connectors/data-connector/status
Ошибок нет, видим статус RUNNING, значит все норм.

Если все-таки есть ошибки, из практики, скорее всего неправильно указан пароль\права debezium или просто не рестартанули контейнер mysql. Вообще прилетает сразу весь список ошибок, можно разобраться.

Итак наш CDC готова, давайте посмотрим топики в kafka:
docker exec -i broker /usr/bin/kafka-topics --list --bootstrap-server broker:29092

Коннектор создал служебные топики в kafka, но это еще не топики для сообщений об изменениях.

Зайдем в наш mysql контейнер и попробуем создать простую таблицу:

CREATE TABLE IF NOT EXISTS tasks (     
  task_id INT AUTO_INCREMENT PRIMARY KEY,     
  title VARCHAR(255) NOT NULL,     
  start_date DATE,     
  status TINYINT NOT NULL,     
  priority TINYINT NOT NULL,     
  description TEXT,     
  created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP 
)  ENGINE=INNODB;

Опять смотрим топики в kafka:

docker exec -i broker /usr/bin/kafka-topics --list --bootstrap-server broker:29092

Ничего не поменялось
Ничего не поменялось

Дело в том, что топики создаются в kafka с «lazy approach» и т.к. мы отключили логи изменения схемы БД, нам ничего не прилетело, топик не создался.

Теперь давайте добавим данные в таблицу:

INSERT INTO tasks (title, start_date, status, priority, description) 
VALUES ("title1", "2021-05-28", "1", "1", "description1");

Опять смотрим топики в kafka:

О чудо, у нас появился новый топик mysql_server.person.tasks!
Тут важно понимать, что коннектор создает в Kafka отдельный топик под каждую таблицу - one topic per table.

Подключаем консольного консьюмера
docker exec -i broker /usr/bin/kafka-console-consumer --bootstrap-server broker:29092 --topic mysql_server.person.tasks --from-beginning

и видим наш лог:

Если эта статья окажется полезной и будет одобрена модератором, в следующей статье я расскажу о новой модной штуке - как создать Spring Cloud Streams приложение (с Spring Function Consumer, без deprecated @StreamListener).

Комментарии (9)


  1. mack
    26.09.2021 16:37
    +1

    Полезно, спасибо.


  1. korsetlr473
    27.09.2021 14:45
    +1

    а как это ты подключился к


    1. JaS4083 Автор
      27.09.2021 14:48

      а как это ты подключился к подключился к кафка через kafka коннект систему, но при этом у тебя в docker compose не установлен сервис kafka connect? "cp-server-connect" ?

      В туториале (https://debezium.io/documentation/reference/tutorial.html) нашел такое:Debezium is built on top of Apache Kafka and provides Kafka Connect compatible connectors that monitor specific database management systems. Debezium records the history of data changes in Kafka logs, from where your application consumes them.Т.е. Дебизиум использует под капотом kafka-connect.

      И так мы конфижим kafka-connect что бы он подключался к kafka брокеру.

      environment:
        BOOTSTRAP_SERVERS: broker:9092
        GROUP_ID: connect-debezium
        CONFIG_STORAGE_TOPIC: docker-connect-debezium-configs
        OFFSET_STORAGE_TOPIC: docker-connect-debezium-offsets
        STATUS_STORAGE_TOPIC: docker-connect-debezium-status
        KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
        VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter


  1. aademchenko
    27.09.2021 15:27
    +1

    А что это за бин-лог в MySql DB?


    1. JaS4083 Автор
      27.09.2021 22:16
      +1

      По сути это лог-файлы в сервере MySql, где записываются ивенты (изменение схемы, добавлени/изменение/удаление данных и т.д.) Это логгирование можно "включить" в настойках сервера (по дефолту оно выключенно).
      Более детально об этом можно почитать здесь - https://dev.mysql.com/doc/refman/8.0/en/binary-log.html


  1. ololx
    27.09.2021 16:35
    +1

    Спасибо за статью, очень полезная. Года 2 назад уже рассматривал debezium, с СУБД mysql все работало хорошо по мануалам, но с postgresql не получилось никак реализовать CDC. Поэтому писал реализацию CDC уже сам под каждый конкретный сервис - у сервиса (master data) пишется продюсер для кафки, а у зависимых (slave) сервисов уже консьюмеры.

    Но благодаря Вашей статье решил попробовать debezium снова, тем более за два года могло многое измениться)

    Недавно наткнулся еще и на embedded реализацию debezium и начал изучать этот вариант. Если интересно начало проекта тут - https://github.com/innopolis-university-java-team/change-data-capture-instances.


    1. JaS4083 Автор
      27.09.2021 19:32

      Интересно, тоже удалось тоже подключиться к логу Postgres, или просто в сервис, который работал с Postgres добавили кафка продьюсера ?

      Embedded Debezium пробовал, работает со Spring Boot довольно неплохо. Но в реализацию его не взяли потому что, у нас было 3 микросервиса которые читали из 3х БД (и складывали в одну БД), и так бы получилось, что нужно аж 3 embedded debezium разворачивать. А если делать stand along server, то в одном debezium сервере можно сразу законфижить 3 разных коннектора, и все рабоатет. Плюс модный Spring Cloud Stream ....
      (хочу о нем еще написать).


      1. ololx
        27.09.2021 19:39
        +1

        Все куда проще - именно продюсера на spring-cloud-stream который вызывался непосредственно после успешной манипуляции с сущностью; написал ДТО по образу того, что есть в debezium (поля T before, T after + метод, который по соотношению значений полей определяет тип события - delete, update и create). Что хорошего в такой реализации - можно по необходимости Google Protobuff применить.

        Получается при возникновении события в сервисе (например создании сущности) создается инстанс ДТО, с before = null, after = объект; ДТО поститься в кафку; ну а далее уже консьюмеры отрабатывают. На тот момент это было быстрее, чем разбираться с debezium до конца.


        1. JaS4083 Автор
          27.09.2021 21:26

          Звучит как рабочий подход. Тем более скорее всего 2 года назад Debezium был еще "сырым".
          Слушал выступление его создателя Gunar Morlin, он говорил что главный use case прежде всего если есть БД с клиентом написаном на легаси (диком легаси, куда и лезть не хочется), - тогда CDC точно для этого.
          Ну если такого нет, то наверное можно написать самому, или все-таки прикрутить и наконфижить Debezium (что оказалось тоже не сложно).
          С Protobuf не работал, не могу ничего сказать.