Суть CDC в том, что он "захватывает" все изменения в базе данных.

Существует два вида CDC:

  • На основании запросов

  • На основании журнала

Если говорить про захват изменений на основании запросов, то есть следующие минусы:

  • Не самый лучший вариант, потому что CDC будет по крону обращаться к БД и получать изменения; большая нагрузка на БД.

  • Нельзя увидеть удаленные и/или изменённые строки в таблице.

  • Не реализуемо, если в таблице нет вообще никаких дат и поэтому мы не сможем захватить изменения.

И следующие плюсы:

  • Быстрая реализация.

  • Обычно не требует специфических знаний и поэтому реализация не будет сложной.

И при сравнении с захватом изменений через журнал транзакций у нас следующая картина по минусам:

  • Сложно реализуемо

  • Необходима бОльшая инфраструктура для реализации данного вида CDC

  • Сложнее в поддерживании

Но в тоже время мы получаем следующие плюсы:

  • Мы можем отслеживать Все изменения в нашей БД: добавление, удаление и изменение записей

  • Это не нагружает БД, потому что мы читаем WAL-журнал

  • Можно реализовать real-time аналитику

  • Можно реализовать event-driven подход

Ниже будет рассказано о том, как реализовать CDC при использовании Debezium. Если вы не хотите повторять все операции самостоятельно, то можете воспользоваться моим репозиторием, в котором есть всё что используется в статье. А статью можете использовать как справочник.


Для начала создадим docker-compose.yaml при помощи которого сделаем всю инфраструктуру.

docker-compose.yaml
version: '2'

services:

  zookeeper:
    image: quay.io/debezium/zookeeper:2.5
    ports:
      - "2181:2181"
      - "2888:2888"
      - "3888:3888"

  kafka:
    image: quay.io/debezium/kafka:2.5
    ports:
      - "9092:9092"
    links:
      - zookeeper
    environment:
      - ZOOKEEPER_CONNECT=zookeeper:2181

  postgres:
    image: quay.io/debezium/example-postgres:2.5
    ports:
      - "5432:5432"
    environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres

  connect:
    image: quay.io/debezium/connect:2.5
    ports:
      - "8083:8083"
    links:
      - kafka
      - postgres
    environment:
      - BOOTSTRAP_SERVERS=kafka:9092
      - GROUP_ID=1
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - STATUS_STORAGE_TOPIC=my_connect_statuses

  control-center:
    image: confluentinc/cp-enterprise-control-center:7.6.1
    hostname: control-center
    ports:
      - "9021:9021"
    depends_on:
      - kafka
    environment:
      CONTROL_CENTER_BOOTSTRAP_SERVERS: "kafka:9092"
      CONTROL_CENTER_REPLICATION_FACTOR: "1"
  debezium-ui:
    image: quay.io/debezium/debezium-ui:2.5
    platform: linux/x86_64
    depends_on:
      - kafka
      - connect
      - zookeeper
    ports:
      - "8081:8080"
    environment:
      - KAFKA_CONNECT_URIS=http://connect:8083

  ch_server:
    image: clickhouse/clickhouse-server:24.4.1
    ports:
      - "8123:8123"
    environment:
      CLICKHOUSE_USER: click
      CLICKHOUSE_PASSWORD: click

Важно. Используемые образы в docker-compose.yaml – это образы от самого debezium. Поэтому часть шагов и инструкций уже были выполнены за вас. Но если вы хотите собрать всё самостоятельно, то воспользуйтесь документацией.

Затем создадим config register-postgres.json

register-postgres.json
{
  "name": "inventory-connector",
  "config": {
    "connector.class": "io.debezium.connector.postgresql.PostgresConnector",
    "tasks.max": "1",
    "database.hostname": "postgres",
    "database.port": "5432",
    "database.user": "postgres",
    "database.password": "postgres",
    "database.dbname": "postgres",
    "topic.prefix": "dbserver1",
    "schema.include.list": "inventory"
  }
}

После этого выполним запрос:

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ -d @register-postgres.json

Если перейти по адресу http://localhost:9021/ и зайти в Topics, то можно увидеть топики с указанием наших таблиц из БД.

Топики в Kafka
Топики в Kafka

Начиная с этого момента работает CDC. Все изменения, которые будут происходить в БД будут отправляться в Kafka.


Давайте для примера возьмем таблицу с пользователями и сэмулируем "рабочую" нагрузку.

Для этого я предлагаю использовать небольшой скрипт, который будет генерировать новые записи и изменять старые.

Но перед тем как написать скрипт нужно подготовить окружение; для этого выполните следующую команду:

python3.11 -m venv venv && \
source venv/bin/activate && \
pip install --upgrade pip && \
pip install -r requirements.txt

Создайте файл table_simulation.py и в него поместите следующий код:

table_simulation.py
import uuid
import json
import time
import random
import requests

import pandas as pd
from connectors_to_databases import PostgreSQL

pg = PostgreSQL(
    host='localhost',
    port=5432,
    login='postgres',
    password='postgres',
)


def insert_user():
    r = requests.get(url='https://randomuser.me/api/')

    d = json.loads(r.text)
    dict_user = d['results'][0]

    d_ = {
        'first_name': [dict_user['name']['first']],
        'last_name': [dict_user['name']['last']],
        'email': [dict_user['email']],
    }

    df = pd.DataFrame(d_)

    try:
        pg.insert_df(
            df=df,
            pg_table_schema='inventory',
            pg_table_name='customers',
        )
    except Exception as ex:
        pass

    time.sleep(2)


def update_user():
    df_id = pg.execute_to_df(
        '''
        SELECT id FROM inventory.customers
        '''
    )

    len_list = len(df_id.id)

    random_user = random.randint(0, len_list)

    try:
        pg.execute_script(
            f'''
            UPDATE
                inventory.customers
            SET
                first_name = '{str(uuid.uuid4())}',
                last_name = '{str(uuid.uuid4())}'
            WHERE
                id = {df_id.id[random_user]}
            '''
        )
    except Exception as ex:
        pass

    time.sleep(2)


while True:
    if random.randint(1, 1000) % 2 == 0:
        print('INSERT new USER')
        insert_user()
    else:
        print('UPDATE current USER')
        update_user()

Запускаем наше виртуальное окружение

source venv/bin/activate

Запускам скрипт:

python table_simulation.py

После запуска скрипта необходимо перейти по адресу http://localhost:9021/ и зайти в топик dbserver1.inventory.customers

Там мы будем видеть все сообщения, которые поступают в Kafka и это как раз все наши изменения из WAL-журнала.

Ниже формат сообщения, которое приходит к нам в Kafka

Message
{
  "schema": {
    "type": "struct",
    "fields": [
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "default": 0,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "last_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "email"
          }
        ],
        "optional": true,
        "name": "dbserver1.inventory.customers.Value",
        "field": "before"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "int32",
            "optional": false,
            "default": 0,
            "field": "id"
          },
          {
            "type": "string",
            "optional": false,
            "field": "first_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "last_name"
          },
          {
            "type": "string",
            "optional": false,
            "field": "email"
          }
        ],
        "optional": true,
        "name": "dbserver1.inventory.customers.Value",
        "field": "after"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "version"
          },
          {
            "type": "string",
            "optional": false,
            "field": "connector"
          },
          {
            "type": "string",
            "optional": false,
            "field": "name"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "ts_ms"
          },
          {
            "type": "string",
            "optional": true,
            "name": "io.debezium.data.Enum",
            "version": 1,
            "parameters": {
              "allowed": "true,last,false,incremental"
            },
            "default": "false",
            "field": "snapshot"
          },
          {
            "type": "string",
            "optional": false,
            "field": "db"
          },
          {
            "type": "string",
            "optional": true,
            "field": "sequence"
          },
          {
            "type": "string",
            "optional": false,
            "field": "schema"
          },
          {
            "type": "string",
            "optional": false,
            "field": "table"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "txId"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "lsn"
          },
          {
            "type": "int64",
            "optional": true,
            "field": "xmin"
          }
        ],
        "optional": false,
        "name": "io.debezium.connector.postgresql.Source",
        "field": "source"
      },
      {
        "type": "string",
        "optional": false,
        "field": "op"
      },
      {
        "type": "int64",
        "optional": true,
        "field": "ts_ms"
      },
      {
        "type": "struct",
        "fields": [
          {
            "type": "string",
            "optional": false,
            "field": "id"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "total_order"
          },
          {
            "type": "int64",
            "optional": false,
            "field": "data_collection_order"
          }
        ],
        "optional": true,
        "name": "event.block",
        "version": 1,
        "field": "transaction"
      }
    ],
    "optional": false,
    "name": "dbserver1.inventory.customers.Envelope",
    "version": 1
  },
  "payload": {
    "before": {
      "id": 1003,
      "first_name": "Edward",
      "last_name": "Walker",
      "email": "ed@walker.com"
    },
    "after": {
      "id": 1003,
      "first_name": "2df7fb35-4c33-4742-9e56-10073c54937e",
      "last_name": "07f6a1e6-6a0a-4d51-98c3-9e67ea49b3b5",
      "email": "ed@walker.com"
    },
    "source": {
      "version": "2.5.4.Final",
      "connector": "postgresql",
      "name": "dbserver1",
      "ts_ms": 1715153932452,
      "snapshot": "false",
      "db": "postgres",
      "sequence": "[\"34498256\",\"34498256\"]",
      "schema": "inventory",
      "table": "customers",
      "txId": 779,
      "lsn": 34498256,
      "xmin": null
    },
    "op": "u",
    "ts_ms": 1715153932842,
    "transaction": null
  }
}

Важная здесь информация, которая позволяет нам отслеживать изменения – это вот этот кусок сообщения:

...
"payload": {
    "before": {
      "id": 1003,
      "first_name": "Edward",
      "last_name": "Walker",
      "email": "ed@walker.com"
    },
    "after": {
      "id": 1003,
      "first_name": "2df7fb35-4c33-4742-9e56-10073c54937e",
      "last_name": "07f6a1e6-6a0a-4d51-98c3-9e67ea49b3b5",
      "email": "ed@walker.com"
    },
    "source": {
      "version": "2.5.4.Final",
      "connector": "postgresql",
      "name": "dbserver1",
      "ts_ms": 1715153932452,
      "snapshot": "false",
      "db": "postgres",
      "sequence": "[\"34498256\",\"34498256\"]",
      "schema": "inventory",
      "table": "customers",
      "txId": 779,
      "lsn": 34498256,
      "xmin": null
    },
...

Данное сообщение нам показывает что было в БД до и что стало после изменения. Формат сообщения позволяет нам создавать SCD2 и отслеживать все изменения по ключу.

Всю обработку потока можно реализовать разными способами: Spark, Flink, etc

Но я покажу как просто и быстро можно прочитать топики Kafka через ClickHouse.

Для этого я ранее уже добавил ClickHouse в наш docker-compose.yaml.

После этого необходимо подключиться к ClickHouse через любой менеджер подключений или через cli, я буду подключаться через DBeaver.

Если вы захотите углубиться в тему kafka-engine в ClickHouse, то можно воспользоваться официальной документацией.


Ниже я покажу, как подключить топик dbserver1.inventory.customers к ClickHouse

Для начала нам необходимо создать consumer для Kafka:

DROP TABLE IF EXISTS customers_kafka;

CREATE TABLE customers_kafka
(
  payload String,
  "schema" String
)
  ENGINE = Kafka 
  SETTINGS
    kafka_broker_list = 'kafka',
    kafka_topic_list = 'dbserver1.inventory.customers',
    kafka_group_name = 'foo',
    kafka_format = 'JSON';

Затем создаем таблицу, в которую будут записываться данные из топика:

DROP TABLE IF EXISTS customers_kafka_mv;     
    
CREATE TABLE customers_kafka_mv
(
  payload String,
  "schema" String
)
  ENGINE = MergeTree()
  ORDER BY (payload);

В конце создаем мат.представление, которое будет уже записывать наши записи из топика в таблицу:

DROP TABLE IF EXISTS mv_customers_kafka;

CREATE MATERIALIZED VIEW mv_customers_kafka
TO customers_kafka_mv AS
SELECT * FROM customers_kafkasql

И если запустить следующий скрипт:

SELECT count(*) FROM customers_kafka_mv

То мы увидим количество сообщений, которое было записано в данный топик Kafka. И также мы можем прочитать любое из сообщений.

Так как данные сохранятся в формате JSON, то для "разбора" полученных JSON нужно пользоваться встроенными методами в ClickHouse.


На этом всё. Реализация в статье – это просто один из примеров того, как это может выглядеть. При выборе технологий смотрите на потребности бизнеса, где есть компетенции и пр. В целом, вы можете и не использовать ClickHouse для чтения топиков, а написать свой сервис на Python или реализовать микро-батчинг посредством Apache Airflow.

Резюме: Захват изменений можно использовать для разных целей и под каждую цель – свой инструмент.

Комментарии (6)


  1. Mikl_m13
    14.05.2024 00:01

    Репозиторий на гитхабе недоступен или отсутствует на 16:15 MSK.


    1. k0rsakov Автор
      14.05.2024 00:01

      Изменил настройки. Теперь всё доступно.


  1. BigD
    14.05.2024 00:01

    Из Kafka хорошо читает Apache NiFi с дальнейшей трансформацией


    1. k0rsakov Автор
      14.05.2024 00:01

      Согласен. О чем я вкратце написал в статье.


      1. BigD
        14.05.2024 00:01

        не увидел в тексте упоминания NiFi


  1. dude_sam
    14.05.2024 00:01

    Не помню подробностей (сам сидел со стороны Vertica, читая из Kafka), но есть нюанс, что WAL может рости до бесконечности, если в Kafka неправильно удалить толи topic, толи producer.