Меня зовут Сергей Гребенюк, я лидер разработки Sidec (Росреестр). Расскажу, как решили задачу объединения двух топиков с соотношением один ко многим и почему не устроило решение на Kafka-streams (kafka docs) и RocksDB (github). А также о том, как, опираясь на гарантии доставки exactly-once (EOS) (confluent docs), смогли снизить требования к ресурсам в несколько раз.

На иллюстрации показаны два подхода к объединению топиков: с persistent cache и in-memory cache. Мы перейдём от первой схемы ко второй. 

Постановка задачи

Sidec — это CDC ETL‑инструмент (wiki), расширение debezium (Github) для enterprise. В рамках одной из задач потребовалось доработать PostgreSQL source connector (debezium docs) под формат сообщений заказчика. Контракт предполагал указание информации из коммита транзакции в каждом сообщении.

Устройство WAL Postgres (Хабростатья о WAL) предполагает, что информация о коммите транзакции становится доступна после чтения всех сообщений транзакции. Максимальный размер транзакции больше, чем heap Java‑приложения. Объединить данные до отправки в Kafka не получится, нужно внешнее хранение. Но мы рассматривали хранение только в Kafka, потому что любая новая система хранения — это дополнительная точка отказа и, как следствие, затраты на обеспечение высокой доступности. Решили публиковать данные в промежуточные топики с последующим объединением. PostgresConnector публикует DML‑события в топик data, метаинформацию о транзакции — в топик transaction. При поступлении сообщений в топики два источника данных объединяются по условию с соотношением один ко многим. Один коммит — много DML‑операций.

Решение с kafka-streams и RocksDb

Топики объединяли с помощью kafka‑streams (kafka docs). Поставщик данных отправляет сообщения с гарантией at‑least‑once. Возможны повторные отправки событий, но нет гарантии очередности элементов. Решили использовать RocksDbStateStore. DML‑операции транзакции накапливали в Store до появления информации о коммите. Финальный шаг — сопоставление сообщений транзакции и отправка в топик конечных данных. После передачи всей транзакции в топик потребителя события удалялись из stateStore по ключам. Тут потребовалось доработать processor (kafka docs) для реализации сложной логики. Можно было подобрать и решение с нативным API kafka‑streams join.

public Topology init() {
       StreamsBuilder builder = new StreamsBuilder();
       StoreBuilder<KeyValueStore<String, CommitMetadataDTO>> commitMetadataStore = Stores.keyValueStoreBuilder(
                       Stores.persistentKeyValueStore(COMMIT_METADATA_STORE),
                       STRING_SERDE, COMMIT_METADATA_SERDE)
               .withCachingEnabled();
       StoreBuilder<KeyValueStore<String, DataDTO>> dataStore = Stores.keyValueStoreBuilder(
               Stores.persistentKeyValueStore(DATA_SERDE),
               STRING_SERDE, DATA_DTO_SERDE
       ).withCachingEnabled();
       builder.addStateStore(commitMetadataStore);
       builder.addStateStore(dataStore);
       builder.stream(recordTopics, Consumed.with(keyGenericAvroSerde, valueByteAvroSerde))
               .process(new CustomProcessor(commitMetadataStore.name(), dataStore.name(), commitMetadataStore.name(), dataStore.name())
               .to(outputTopicName, Produced.with(STRING_SERDE, DATA_DTO_SERDE));
       return builder.build();

Преимущества:

  • простая архитектура;

  • не важен порядок поступления сообщений;

  • повторные отправки в семантике at‑least‑once обрабатывались корректно благодаря key‑value Store.

Недостатки:

  • высокие требования к ресурсам.

Красным цветом на схеме выше обозначены дополнительные элементы хранения при использовании join. Kafka‑streams записывает StateStore на диск в RocksDB и сохраняет changelog в топиках Kafka. При падении узла с хранилищем RocksDB состояние восстановится из changelog топика на другом узле или на текущем после перезапуска.

Сколько сообщений может быть записано в StateStore? Ответ зависит от специфики потока данных и гарантий доставки. Для экспорта CDC‑потока максимальное количество сообщений в StateStore должно вместить наибольшую транзакцию Postgres (StackOverflow). Но даже если не стремиться поддержать теоретический предел, а предоставить пользователям самим определять наибольший размер транзакций, проблема остаётся. Обычно приходилось монтировать двухсотгигабайтные диски на серверах приложений для RocksDb и такие же для kafka_data на каждом брокере. Инсталляция в пять брокеров Kafka и три приложения в кластере требовала дополнительно 1,5–2 терабайта. Помимо дисков активно расходуются ресурсы процессора. Обращение к RocksDb и топикам changelog выполняется через Serealize/Deserialize. Каждый cdc‑event записывается в реплики топиков cdc_event_dml_topic, cdc_even_dml_topic_changelog, result_output_topic и на диск приложения в rocksDb.

В итоге получили значительное потребление ресурсов при решении типовой задачи.

Алгоритм объединения топиков без persistent store

Поставили цель: объединять сразу после чтения из топиков Kafka без дополнительного хранения.

Алгоритм: есть две очереди, data‑queue и transaction‑queue, нужно сравнить их начальные элементы. Если критерий сопоставления выполняется, то объединить и перейти к следующему элементу в data‑queue. В противном случае перейти к следующему элементу в transaction‑queue. Если одна из очередей не содержит элементов, то ожидать поступления.

Схематично алгоритм объединения сообщений можно отразить так:

Топик data содержит n партиций, топик transaction — одну. Это позволяет разделить задачу на несколько consumer и объединять параллельно.

[data.p0; transaction.p0]
[data.p1; transaction.p0]
[data.p2; transaction.p0]
[data.p3; transaction.p0]

Обработка одной пары топиков data.pn: transaction.pn — одна единица исполнения (Task). Но логика не сработает, если нарушается очерёдность событий в партициях, или если позиции consumer offset различаются слишком сильно.

Обеспечение гарантии очерёдности

Слот логической репликации Postgres (postgres docs) предоставляет события транзакции последовательно. События разных транзакций не пересекаются. Коннектор Debezium Postgres также не нарушает очерёдность сообщений при обработке. Она может нарушаться при разрывах соединения с базой данных, падения приложения и ошибок отправки в Kafka (Debezium things go wrong). После падения коннектор возобновляет чтение слота репликации с последнего сохранённого LSN. Это означает, что все события после этого LSN, которые были вычитаны и отправлены ранее, будут обработаны повторно.

Необходимость использования exactly-once

Переотправка сообщений в топик data не всегда приводит к повторной публикации в топике transaction в том же порядке. На схеме ниже представлен пример с повторной публикацией после отправки сообщения B_1 и до отправки в топик transaction B_C.

  1. отправка DML A_1;

  2. отправка коммита A_C;

  3. отправка DML B_1;

  4. сбой задачи;

  5. отправка DML A_1;

  6. отправка коммита A_C;

  7. отправка DML B_1;

  8. отправка коммита B_C.

Согласно алгоритму объединения, сообщения A_1 и B_1 попадут в топик result. Но дубли останутся в очереди и заблокируют дальнейшее объединение, так как соответствующие им данные в топике transaction уже исключены из рассмотрения.

При включении механизма exactly‑once переотправка не приведёт к появлению дублей в consumer.

Вывод: очерёдность сохраняется, если гарантирована последовательность цепочки операций.

  1. чтение из базы данных;

  2. обработка сообщения в приложении;

  3. публикация в Kafka.

Контроль смещения consumer

EOS‑публикация гарантирует последовательность сообщений в партициях, но не избавляет от гонки consumer при чтении топиков data и transaction. Расхождение можно устранить кешированием. Сразу после poll сообщения помещаются в блокирующую очередь, которую разбирает поток объединения сообщений. Последовательная публикация событий транзакции означает, что сначала будут отправлены все CDC‑события в топик data, потом commit‑сообщения в топик transaction. Если сообщений в транзакции больше кеша или произошло отставание одной из партиций в Kafka, то переполнение памяти произойдёт раньше, чем поток join успеет очистить кеш. Однако, это не стало причиной вернуться к persistentStore RocksDb.

Kafka‑streams для балансировки сообщений между StreamTask тоже использует блокирующую очередь. Consumer на pollPhase (Github) передаёт в StreamTask сообщения через ArrayDeque (Github). Если количество сообщений в очереди превышает buffered.records.per.partition (Github), то активируется механизм back pressure с приостановкой потребления.

/**
* Adds records to queues. If a record has an invalid (i.e., negative) timestamp, the record is skipped
* and not added to the queue for processing
*
* @param partition the partition
* @param records   the records
*/
@Override
public void addRecords(final TopicPartition partition, final Iterable<ConsumerRecord<byte[], byte[]>> records) {
    final int newQueueSize = partitionGroup.addRawRecords(partition, records);

    if (log.isTraceEnabled()) {
        log.trace("Added records into the buffered queue of partition {}, new queue size is {}", partition, newQueueSize);
    }

    // if after adding these records, its partition queue's buffered size has been
    // increased beyond the threshold, we can then pause the consumption for this partition
    if (newQueueSize > maxBufferedSize) {
        mainConsumer.pause(singleton(partition));
    }
}

При этом consumer продолжает отправлять heartbeat брокеру Kafka, что не приводит к перебалансировке. Когда очередь освобождается, потребление возобновляется методом resume.

// after processing this record, if its partition queue's buffered size has been
// decreased to the threshold, we can then resume the consumption on this partition
if (recordInfo.queue().size() == maxBufferedSize) {
    partitionsToResume.add(partition);
}
@Override
  public void resumePollingForPartitionsWithAvailableSpace() {
      if (!partitionsToResume.isEmpty()) {
          mainConsumer.resume(partitionsToResume);
          partitionsToResume.clear();
      }
  }

Эту логику использовали для организации in‑memory кеша. Добавление в data queue и transaction queue выполнялось сразу после consumer.poll. Удаление элемента из data queue выполняется после успешного объединения, из transaction queue — если критерий соответствия не выполняется. Если одна из очередей не содержит элементы, такт цикла пропускается, поток ждёт сообщение.

Выбор kafka-connect

Этапы реализации:

  • подписка на топики, назначение партиций;

  • управление жизненным циклом задачи;

  • восстановление состояния задачи после перезапуска;

  • вызов pause/resume по партиции топика в зависимости от наполнения очереди;

  • чтение сообщений Kafka с уровнем изоляции read_commited;

  • проверка критериев объединения.

Решение в виде kafka‑connect source connector предоставляет лучший баланс свободы контракта использования фреймворка и функций «из коробки». Определяющими для нас стали управление подпиской на партиции в каждой задаче и возможность хранения и восстановления контекста (offset) задачи.

Мы определились с логикой, гарантиями и инструментами. Опишу теперь ключевые аспекты реализации.

Обеспечение гарантий

  1. producer.max.in.flight.requests.per.connection=5, enable.idempotence=true в kafka producer Переотправка сообщений Kafka producer в случае разрывов сети не приводит к нарушению очерёдности batch-сообщений (StackOverflow).

  2. Партиционирование по первичному ключу при публикации в топик data. Сохраняется очерёдность CDC‑событий по каждой записи из каждой таблицы БД.

  3. Использование exactly‑once в postgres source connector. Повторное подключение к слоту репликации с последним известным LSN не приводит к дублям в топике Kafka.

  4. Использование read_commited в kafka‑consumer в коннекторе source, который объединяет топики. Опубликованное сообщение в Kafka недоступно для consumer до тех пор, пока не будет получено подтверждение приложением.

Kafka поддерживает отправку exactly‑once с версии 3.3.0 (Jira). Активация EOS в Debezium выполняется включением параметров (debezium.blog)

exactly.once.source.support=enabled

Ответная конфигурация consumer

consumer.override.isolation.level=read_commited

private void join(AtomicBoolean isExecutorRunning,
                      String outputDataPackageTopicName,
                      int processingEmptyQueuesDelayMs,
                      Map<String, ?> sourcePartition) {
        try {
            while (isExecutorRunning.get()) {
                if (dataPackageQueue.isEmpty() || txMetadataQueue.isEmpty()) {
                    TimeUnit.MILLISECONDS.sleep(processingEmptyQueuesDelayMs);
                    continue;
                }
                var dataPackageItem = dataPackageQueue.peek();
                var txMetadataItem = txMetadataQueue.peek();
                if (match(txMetadataItem.getRecord(), dataPackageItem.getRecord())) {
                    var sourceOffset = sourceOffset(dataPackageItem.getOffset(), txMetadataItem.getOffset());
                    var record = new SourceRecord(
                            sourcePartition,
                            sourceOffset,
                            outputDataPackageTopicName,
                            dataPackageItem.getPartition(),
                            Schema.BYTES_SCHEMA,
                            dataPackageItem.getRecord().getPackageId().getBytes(StandardCharsets.UTF_8),
                            Schema.BYTES_SCHEMA,
                            ObjectMapperUtil.writeValueAsBytes(
                                    mapDataPackage(dataPackageItem.getRecord(), txMetadataItem.getRecord())
                            ),
                            null,
                            convertHeaders(dataPackageItem.getTopic(), dataPackageItem.getHeaders())
                    );
                    outputDataPackageQueue.enqueue(new DataChangeEvent(record));
                } else {
                   txMetadataQueue.remove();
                }
            }
        } catch (Exception e) {
            log.error("{} data package and tx-metadata join error occurred", taskIndex, e);
            setProducerException("Error occurred during data package and tx-metadata join", e);
        } finally {
            log.info("{} stop data package and tx-metadata join", taskIndex);
        }
    }
private <T extends KafkaRecord> void consume(AtomicBoolean executorRunning,
                                                   Consumer<byte[], byte[]> consumer,
                                                   ConsumerParameters<T> consumerParameters) {
       try {
           var topicPartitions = List.of(consumerParameters.topicPartition);
           consumer.assign(topicPartitions);
           var initialOffset = (consumerParameters.offset < 0) ? 0 : consumerвParameters.offset;
           consumer.seek(consumerParameters.topicPartition, initialOffset);
           var consumerPaused = false;
           while (executorRunning.get()) {
               for (var record : consumer.poll(consumerParameters.consumerPollTimeout)) {
                   if (!consumerPaused && consumerParameters.queue.size() > consumerParameters.queuePauseBarrier) {
                       consumer.pause(topicPartitions);
                       consumerParameters.onPauseConsumer.run();
                       consumerPaused = true;
                   }
                   consumerParameters.recordHandler.accept(record, consumerParameters.queue);
               }
               if (consumerPaused && consumerParameters.queue.size() < consumerParameters.queueResumeBarrier) {
                   consumer.resume(topicPartitions);
                   consumerParameters.onResumeConsumer.run();
                   consumerPaused = false;
               }
           }
       } catch (Exception e) {
           setProducerException("Error occurred during " + consumerParameters.sourceName + " consumption", e);
       } finally {
           consumerParameters.queue.clear();
           Utils.closeQuietly(consumer, consumerParameters.sourceName + " consumer");
       }
   }

Производительность

Измерим скорость работы PostgresConnector и KafkaSourceConnector с использованием exactly‑once и at‑least‑once. Чтобы лучше понимать, как exactly‑once может повлиять на производительность, рассмотрим механизм выставления commit в kafka‑connect.

Source‑коннектор kafka‑connect позволяет хранить контекст обработки в специальном объекте Offset. Offset postgresConnector содержит стартовый LSN слота репликации — это позиция, с которой начинается чтение при запуске стриминга CDC. В ходе коммита PostgresConnector фиксируется позиция LSN в Postgres и записывается контрольная точка в топике connect‑offset.

При публикации с гарантиями at‑least‑once отдельный поток по расписанию выполняет коммит. Частота определяется параметром offset.flush.interval.ms (kafka docs). Чем сохраняются офсеты, тем меньше затрат. С другой стороны, редкие коммиты увеличивают количество повторно обработанных сообщений в случае перезапуска. Коммит выполнялся каждые 60 секунд. Для PostgresConnector это один раз в 250 000 сообщений.

Использование exactly‑once сопровождается публикацией в режиме транзакции, что добавляет к сохранению офсетов коммит транзакции в Kafka. Kafka‑connect EOS также позволяет настраивать частоту коммитов. Однако увеличение интервала напрямую влияет на задержку поступления сообщения в consumer с включённым read_commited.

Мы воспользовались стандартной стратегией: коммит выставляется после успешной обработки пакета сообщений из очереди, а количество не превышает max.batch.size (debezium docs) — 20 000. Обычно коммит выполнялся каждые 10 000 сообщений. То есть при at‑least‑once коммит выполнялся в 25 раз реже.

Однако замеры производительности PostgresConnector показали, что скорость не изменилась. Почему?

На схеме отображены этапы обработки сообщений в коннекторах PostgresConnector и KafkaSourceConnector. Фиолетовым цветом отмечены этапы, ограничивающие скорость публикации.

Коннектор kafka‑connect source работает с блокирующей очередью (event queue на схеме). Один поток работает с источником, обрабатывает сообщения и формирует ProducerRecord. Другой поток разбирает очередь и публикует записи в Kafka. Замедление публикации активирует механизм back pressure и, как следствие, замедляет чтение слота репликации. Это, в свою очередь, приводит к удержанию большего количества WAL‑сообщений в Postgres.

Коммит выполняется на стороне отправки сообщения в Kafka, что замедляет процедуру публикации сообщений. Общая производительность коннектора не снижается, если чтение очереди выполняется быстрее записи. Именно эту ситуацию наблюдали во время тестирования PostgresConnector. Очередь event queue не успевает наполняться, коннектор публикует трафик со скоростью 120 мегабит/сек., это примерно 15 000 TPS для средней таблицы из 10 колонок, что меньше предельной скорости публикации EOS.

Для KafkaSourceConnector ситуация оказалась другой. При включении exactly‑once скорость снизилась с 24 000 до 19 000 сообщений/сек., трафик составил 300 мегабит/сек. Скорость объединения топиков кратно больше скорости публикации.

То есть использование EOS не влияет на скорость обработки сообщений в выбранной архитектуре.

Выводы

Главная идея статьи: не обязательно хранить тяжёлые кеши kafka‑streams в дорогих rocksDb и changelog при объединении топиков. Вместо этого можно использовать локальный кеш с механизмом back pressure. Для Sidec эта идея позволила сэкономить много ресурсов. Реализация логики стала возможной благодаря гарантиям уникальности и очерёдности событий и в топиках Kafka.

Комментарии (0)