Третья статья цикла о построении CDC-пайплайна с нуля. Сегодня — самое интересное: захватываем изменения из PostgreSQL и отправляем в Kafka. И разбираемся, почему WAL может съесть весь диск, даже если данные не меняются.
Зачем я это пишу
Честно: в первую очередь это мои заметки. На работе я поддерживаю CDC-пайплайны — 7 баз данных, сотни миллионов записей, Debezium, Kafka. Но там всё настроено до меня, и разбираться в чужой конфигурации — не то же самое, что понимать почему оно работает именно так.
Во вторую очередь — портфолио. Если буду менять работу, эти статьи можно приложить к резюме.
И в третью — мотивация не бросить. Если хотя бы десять человек прочитают и кому-то пригодится — значит не зря потратил выходные.
Что уже есть
В предыдущих статьях я поднял:
Budget Parser — Telegram-бот, который через Claude Vision API парсит скриншоты банковских транзакций. Помните YM*DEEPHOST -313.95₽ и Красное&Белое -440.83₽? Сейчас увидим их в Kafka.
PostgreSQL с logical replication — wal_level=logical, публикация, пользователь для репликации.
Сейчас будем ловить изменения из PostgreSQL и отправлять в Kafka.
Целевая архитектура

Четыре LXC-контейнера в Proxmox:
PostgreSQL: 192.168.0.151
Kafka + ZooKeeper: 192.168.0.153
Debezium (Kafka Connect): 192.168.0.154
Kafka UI: 192.168.0.160
Часть 1: Kafka + ZooKeeper
Kafka уже умеет работать без ZooKeeper (режим KRaft), но я выбрал классическую связку — больше документации и примеров troubleshooting.
Контейнер и установка
# CT 303, 4GB RAM, 20GB disk, IP 192.168.0.153
apt update && apt install -y openjdk-17-jre-headless wget
wget https://downloads.apache.org/kafka/3.9.0/kafka_2.13-3.9.0.tgz
tar -xzf kafka_2.13-3.9.0.tgz -C /opt/kafka/
4 ГБ RAM — минимум для Kafka + ZooKeeper. В production обычно 8-16 ГБ на брокер.
Конфигурация ZooKeeper
# /opt/kafka/config/zookeeper.properties
dataDir=/var/lib/zookeeper
clientPort=2181
maxClientCnxns=60
admin.enableServer=false
Конфигурация Kafka
# /opt/kafka/config/server.properties
broker.id=0
listeners=PLAINTEXT://0.0.0.0:9092
advertised.listeners=PLAINTEXT://192.168.0.153:9092
zookeeper.connect=localhost:2181
log.dirs=/var/lib/kafka
log.retention.hours=1
Грабля #1: advertised.listeners
Тут была моя первая ошибка, поскольку ранее я уже поднимал Kafka на другом адресе.
Как работает подключение:
Клиент подключается к bootstrap.servers
Kafka отвечает: «Вот список брокеров, подключайся по адресам из advertised.listeners»
Клиент использует эти адреса
Что было у меня:
advertised.listeners=PLAINTEXT://192.168.0.158:9092 # Старый IP!
Kafka слушала на правильном IP, но говорила клиентам подключаться к несуществующему адресу. Результат — Timed out waiting for a node assignment.
Правило: advertised.listeners = IP, доступный клиентам извне.
Systemd и проверка
# /etc/systemd/system/kafka.service
[Service]
Type=simple
User=kafka
Environment="KAFKA_HEAP_OPTS=-Xmx1G -Xms1G"
ExecStart=/opt/kafka/bin/kafka-server-start.sh /opt/kafka/config/server.properties
Requires=zookeeper.service
After=zookeeper.service
# Проверка
echo ruok | nc localhost 2181 # imok
/opt/kafka/bin/kafka-topics.sh --list --bootstrap-server localhost:9092
Часть 2: Debezium и Kafka Connect
Что такое Kafka Connect?
Debezium — это не standalone-приложение, а плагин для Kafka Connect.
Kafka Connect — фреймворк для интеграций:
Source Connectors — читают откуда-то → пишут в Kafka
Sink Connectors — читают из Kafka → пишут куда-то
Debezium — Source Connector, который читает WAL базы данных.
Отдельный контейнер
# CT 304, 2GB RAM, 10GB disk, IP 192.168.0.154
apt install -y openjdk-17-jre-headless wget curl jq
# Kafka (нужен для Connect)
wget https://downloads.apache.org/kafka/3.9.0/kafka_2.13-3.9.0.tgz
tar -xzf kafka_2.13-3.9.0.tgz -C /opt/kafka/
# Debezium PostgreSQL Connector
wget https://repo1.maven.org/maven2/io/debezium/debezium-connector-postgres/3.0.0.Final/debezium-connector-postgres-3.0.0.Final-plugin.tar.gz
mkdir -p /opt/kafka/plugins
tar -xzf debezium-connector-postgres-*.tar.gz -C /opt/kafka/plugins/
Почему отдельно от Kafka? Изоляция. Можно рестартить коннектор, не трогая брокер.
Конфигурация Kafka Connect
# /opt/kafka/config/connect-distributed.properties
bootstrap.servers=192.168.0.153:9092
group.id=debezium-connect-cluster
key.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=false
value.converter=org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable=false
config.storage.topic=connect-configs
offset.storage.topic=connect-offsets
status.storage.topic=connect-status
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
rest.host.name=0.0.0.0
rest.port=8083
rest.advertised.host.name=192.168.0.154
plugin.path=/opt/kafka/plugins
Проверка
systemctl start kafka-connect
curl -s http://localhost:8083/ | jq .version
# "3.9.0"
curl -s http://localhost:8083/connector-plugins | jq '.[].class'
# "io.debezium.connector.postgresql.PostgresConnector"
Часть 3: Подключаем Debezium к PostgreSQL
Конфигурация коннектора
{
"name": "budget-postgres-connector",
"config": {
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"tasks.max": "1",
"database.hostname": "192.168.0.151",
"database.port": "5432",
"database.user": "debezium",
"database.password": "debezium_pass",
"database.dbname": "budget_db",
"topic.prefix": "budget",
"schema.include.list": "finance",
"table.include.list": "finance.parsed_transactions",
"publication.name": "finance_publication",
"slot.name": "debezium_slot",
"plugin.name": "pgoutput",
"snapshot.mode": "initial",
"decimal.handling.mode": "string",
"heartbeat.interval.ms": "10000",
"heartbeat.action.query": "UPDATE finance.debezium_heartbeat SET last_heartbeat = NOW() WHERE id = 1"
}
}
Ключевые параметры
plugin.name — плагин декодирования WAL:
Плагин |
Описание |
pgoutput |
Встроен в PostgreSQL 10+, рекомендуется |
wal2json |
Нужно устанавливать отдельно, выдаёт JSON |
decoderbufs |
Protobuf, компактнее, но сложнее в отладке |
Выбирайте pgoutput, если PostgreSQL ≥ 10.
snapshot.mode — что делать при первом запуске:
Режим |
Поведение |
initial |
Снять snapshot всех данных, потом слушать изменения |
never |
Только новые изменения (данные уже есть в DWH) |
schema_only |
Только схема, без данных |
Для большой таблицы snapshot может занять часы. Планируйте окно обслуживания.
decimal.handling.mode: string — как представлять DECIMAL/NUMERIC:
Режим |
Результат |
Комментарий |
precise (default) |
"8Yw=" |
Base64, точно но нечитаемо |
string |
"-37.00" |
Строка, читаемо |
double |
-37.0 |
Число с плавающей точкой (потеря точности!) |
По умолчанию Debezium кодирует в Base64 для максимальной точности. Consumer может декодировать эти данные, но string нагляднее для отладки и большинства задач. Я выбрал string.
Heartbeat — защита от переполнения диска
Это была неочевидная для меня вещь, но одна из самых важных.
Проблема: Если в таблице нет изменений, Debezium не продвигает offset. Replication slot держит старую позицию в WAL. PostgreSQL не может удалить старые WAL-файлы. Диск заканчивается.
Коннектор показывает RUNNING, всё вроде хорошо, а директория /var/lib/postgresql/16/main/pg_wal/ растёт и растёт.
Решение: Heartbeat. Каждые 10 секунд Debezium делает UPDATE в служебную таблицу → получает событие из WAL → продвигает offset.
-- На PostgreSQL создаём таблицу для heartbeat
CREATE TABLE finance.debezium_heartbeat (
id INTEGER PRIMARY KEY,
last_heartbeat TIMESTAMP
);
INSERT INTO finance.debezium_heartbeat VALUES (1, NOW());
GRANT SELECT, UPDATE ON finance.debezium_heartbeat TO debezium;
-- Добавляем в publication
ALTER PUBLICATION finance_publication ADD TABLE finance.debezium_heartbeat;
Регистрация коннектора
curl -X POST \
-H "Content-Type: application/json" \
-d @postgres-connector.json \
http://localhost:8083/connectors
# Проверяем статус
curl -s http://localhost:8083/connectors/budget-postgres-connector/status | jq .
{
"name": "budget-postgres-connector",
"connector": { "state": "RUNNING" },
"tasks": [{ "id": 0, "state": "RUNNING" }]
}
Часть 4: Смотрим результат
Kafka UI

Топики: CDC-события, heartbeat, служебные топики Kafka Connect

Видно события с таймстемпами — каждая транзакция из Budget Parser

budget-postgres-connector: RUNNING, 1 of 1 tasks
Реальные данные в Kafka
Вот что пришло при первом запуске (snapshot существующих данных, op: "r"):
{
"before": null,
"after": {
"id": 4,
"transaction_date": 20461,
"merchant": "YM*DEEPHOST",
"amount": "-313.95",
"transaction_type": "expense",
"category": "Прочие списания",
"is_verified": true
},
"op": "r",
"source": {
"connector": "postgresql",
"db": "budget_db",
"schema": "finance",
"table": "parsed_transactions",
"lsn": 27105320
}
}
Та самая транзакция YM*DEEPHOST из Budget Parser! Теперь она в Kafka.
Типы операций
op |
Значение |
Когда |
r |
read (snapshot) |
Первоначальное чтение данных |
c |
create |
INSERT |
u |
update |
UPDATE |
d |
delete |
DELETE |
Важно: формат дат
В JSON вы увидите transaction_date: 20461 — это не ошибка. Debezium представляет DATE как количество дней с 1970-01-01 (epoch days). Consumer должен конвертировать:
from datetime import date, timedelta
transaction_date = date(1970, 1, 1) + timedelta(days=20461)
# 2026-01-08
Аналогично для TIMESTAMP — микросекунды с epoch.
Часть 5: Kafka UI
Смотреть события в консоли неудобно. Kafka UI даёт веб-интерфейс:
# CT 306, IP 192.168.0.160
wget https://github.com/provectus/kafka-ui/releases/download/v0.7.2/kafka-ui-api-v0.7.2.jar \
-O /opt/kafka-ui/kafka-ui.jar
cat > /opt/kafka-ui/application.yml << 'EOF'
server:
port: 8080
kafka:
clusters:
- name: homelab
bootstrapServers: 192.168.0.153:9092
kafkaConnect:
- name: debezium
address: http://192.168.0.154:8083
EOF
Открываем http://192.168.0.160:8080 — видим топики, сообщения, статус коннекторов.
Грабли, на которые можно наступить
1. advertised.listeners с неправильным IP
Симптом: Timed out waiting for a node assignment
Причина: Kafka говорит клиентам подключаться к несуществующему IP.
Решение: advertised.listeners = реальный IP, доступный клиентам.
2. Base64 вместо чисел в amount
Симптом: "amount": "8Yw=" вместо "amount": "-37.00"
Причина: Debezium по умолчанию кодирует DECIMAL в Base64.
Решение: "decimal.handling.mode": "string" в конфиге коннектора.
3. WAL растёт без изменений в таблице
Симптом: Директория /var/lib/postgresql/16/main/pg_wal/ занимает всё больше места, коннектор показывает RUNNING.
Причина: Нет heartbeat → Debezium не продвигает offset → replication slot держит WAL.
Решение: Настроить heartbeat.interval.ms и heartbeat.action.query.
4. Kafka Connect не видит плагин Debezium
Симптом: connector-plugins возвращает пустой список или нет PostgresConnector.
Причина: Неправильный plugin.path или плагин не распакован.
Решение: Проверить путь в connect-distributed.properties, убедиться что JAR-файлы на месте.
5. Kafka UI требует авторизацию
Симптом: Окно логина при открытии UI.
Причина: Дефолтная конфигурация с включённой security.
Решение:
spring:
security:
enabled: false
auth:
type: DISABLED
6. Java OutOfMemoryError
Симптом: Kafka или Connect падает с OOM.
Причина: Недостаточно heap памяти.
Решение: KAFKA_HEAP_OPTS=-Xmx1G -Xms1G для Kafka, аналогично для Connect. Не давайте JVM больше 50% RAM контейнера.
Итоговая архитектура
Контейнер |
IP |
RAM |
Назначение |
budget-parser |
192.168.0.150 |
1GB |
Telegram-бот |
pg-source |
192.168.0.151 |
2GB |
PostgreSQL |
kafka |
192.168.0.153 |
4GB |
Kafka + ZooKeeper |
debezium |
192.168.0.154 |
2GB |
Kafka Connect |
monitoring |
192.168.0.160 |
2GB |
Kafka UI |
Итого: 11 ГБ RAM на весь CDC-пайплайн.
Полезные команды
# Статус коннектора
curl -s http://192.168.0.154:8083/connectors/budget-postgres-connector/status | jq .
# Рестарт коннектора
curl -X POST http://192.168.0.154:8083/connectors/budget-postgres-connector/restart
# Пауза
curl -X PUT http://192.168.0.154:8083/connectors/budget-postgres-connector/pause
# Удаление
curl -X DELETE http://192.168.0.154:8083/connectors/budget-postgres-connector
# Replication slot в PostgreSQL
sudo -u postgres psql -d budget_db -c "SELECT slot_name, active FROM pg_replication_slots;"
# Читать события из топика
/opt/kafka/bin/kafka-console-consumer.sh \
--topic budget.finance.parsed_transactions \
--bootstrap-server 192.168.0.153:9092 \
--from-beginning
Что дальше
Данные текут из PostgreSQL в Kafka. Пока они просто накапливаются в топике с retention 1 час и исчезают.
Следующий этап — написать Consumer и построить слои хранения:
Kafka → Consumer → Avro (сырой слой) → Parquet (промежуточный слой) → Hive
Там будет много интересного: вычитка из Kafka, преобразование форматов, инкрементальная загрузка, сравнение с тем что уже есть в хранилище.
# |
Статья |
Статус |
1 |
✅ |
|
2 |
✅ |
|
3 |
Kafka + Debezium |
✅ Эта статья |
4 |
Consumer + HDFS/Hive |
⏳ Следующая |
5 |
Мониторинг в Grafana |
? Планируется |
Ссылки
Вопросы — в комментарии. И расскажите про свой опыт с CDC: кто ещё настраивает это для себя или в production? Какие грабли встречались? Интересно узнать, что я не один такой.
Комментарии (5)

whoisking
31.01.2026 14:26Года полтора назад пытался втащить этот дебезиум, сразу наткнулись на съедание диска wal'ом, перечитал кучу ишью, решений, в том числе, с хартбитом, ничего не помогало, плюнули, написали за несколько дней свой cdc и чувствовали себя гораздо лучше, да ещё и с полным пониманием происходящего.
Nikudator
Можно поставить Debezium UI и просмотр состояния коннекторов и их рестарт делать через web-интерфейс.
igorat
Не знал про такое, точнее, даже не подумал посмотреть.
Статус просто проверял либо руками, либо через скрипты, чтобы траблшутить.
Настраивал с нуля CDC доя постгреса, мускла и монги в Azure, только средствами Azure.
Надо рассказать про такую возможность команде, которая это сопровождает сейчас.