Привет, Хабр! На связи СберТех — мы создаём Platform V, цифровую платформу Сбера для разработки бизнес-приложений.

В платформу входит более 60 продуктов на базе собственных сборок open source, доработанных до уровня enterprise по функциональности, безопасности, производительности и отказоустойчивости.

В этой статье расскажем про реализацию паттерна Change Data Capture и межкластерной репликации данных в продукте Platform V DataGrid, распределённой in-memory базе данных для высокопроизводительных вычислений. А также об особенностях внедрения функции и вариантах репликации. Написать материал помог наш коллега Николай Ижиков из команды по развитию баз данных на стеке open source.

Что такое Change Data Capture

Представим, что у вас есть база данных с критичными для бизнеса данными и жёсткими SLA по чтению и записи. 

В то же время есть компоненты системы, которым необходимо реагировать на изменения в данных, например на уведомления о поступлении нового заказа, изменение данных пользователя и т. д. Таких компонентов много, и со временем их количество только растёт.

Если база уже нагруженная, навешивать на микросервис, создающий заказ, синхронную обработку или писать хранимую процедуру нерационально — увеличим тайминги по ответам, вероятен риск нарушения SLA.

И тут на помощь приходит паттерн Change Data Capture или CDC.

Когда в базу данных вносятся изменения, для ускорения записи и оптимизации операций она пишет журнал изменений (UNDO, REDO логи). В него база данных последовательно записывает дельту, которую вы изменяете.

CDC — это приложение, которое умеет обрабатывать логи изменений, выделять из них события об изменении данных и уведомлять об этом потребителя изменений, реализующего бизнес-логику.

В результате получаем линейные, упорядоченные по времени события изменений данных: что было, что есть, какая операция, над какой таблицей или кэшем была выполнена. Обработка асинхронная, не триггер и не хранимая процедура. Данные закоммитились, приложение, которое их отправляет, продолжило работать дальше, а потребитель через какое-то время получает уведомление о произошедшем изменении.

Как и когда использовать CDC

Стриминг изменений в хранилище данных. У вас есть DWH. Обычно в режиме реального времени данные в неё поступать не должны. Для перекладывания данных можно написать процедуры, которые будут раз в час или в сутки определять дельту и отправлять её в хранилище. С помощью CDC эти же данные можно перекладывать с меньшими задержками.

  • Постобработка событий. В системе произошло событие — пользователь зарегистрировался, создал заказ, загрузил новый файл, — и, согласно бизнес-процессу, по новой записи нужно инициировать модерацию или другие действия.

  • Аналитика. По поступающим в CDC событиям можно считать аналитику в режиме, близком к реальному времени.

  • Логическая репликация. В CDC у нас на руках есть ВСЕ изменения, которые происходят в базе. Для реализации репликации нужно всего лишь надёжно исполнить их на реплике.

CDC в open source database

Дизайн

При любой доработке сложной системы, к которой, очевидно, относится распределённая СУБД, всегда есть риск что-то сломать. Лучший выход — делать новую фичу, вообще не трогая существующие.

Поэтому, проектируя CDC на базе Ignite, команда решила, что ignite-cdc должен выступать как отдельный java-процесс, не влияющий на ноду Ignite.

Ignite в persistence-режиме, как и любая классическая СУБД, записывает изменения в WAL (Write-Ahead Log). WAL — бинарный файл, содержащий изменения, дельты, которые мы периодически пишем в основную память (page memory).

Время от времени WAL-сегмент переходит в архив. Ignite-cdc видит, что появился архивный WAL-сегмент, и обрабатывает его.

Обработка — уведомление потребителя об изменениях. Есть public API для потребителя, но можно написать и свой. 

Важно, что при этом нет перерасхода места на диске: WAL-архив — это существующая функциональность, которая нужна для восстановления после сбоев. Ignite-cdc обрабатывает ровно те же сегменты, никаких новых данных на диске не появляется.

Следующий важный момент — возможность сохранять состояние чтения. Ignite-cdc — отдельный процесс, который может падать. Нужно реализовать возможность фиксации состояния потребителя каждый раз, когда он решил, что данные обработаны и можно сохраниться. При падении обработка будет продолжена с места последнего commit-а. К счастью, поддерживать это довольно просто: нужно всего лишь сохранять указатель на том месте в сегменте, на котором чтение остановилось.

Из возможности сохранять состояние следует возможность сделать fail-fast-приложение. При любых проблемах Ignite-cdc падает. Предполагается, что поднимать его будут с помощью ОС-механизмов.

На уровне ноды всё выглядит вот так:

Есть небольшая тонкость: WAL-архив не бесконечный, Ignite складывает в архив столько сегментов, сколько было указано в настройках. При архивации n+1 сегмента самый старый удаляется.

Чтобы избежать ситуаций, когда CDC затормозил и не обработал уже удалённый сегмент, архивный сегмент hard-link’ом переносится в папку, с которой работает только Ignite-cdc.

Если удалим данные из архива, файл останется в папке для СDC, и данные будут доступны.

Если Ignite-cdc обработал сегмент, его можно будет сразу же удалить. Данные исчезнут с диска, когда оба hard-link’а будут удалены.

Приложению понадобятся метрики. API уже есть в Ignite, и его нужно переиспользовать.

API и настройки

Для настройки CDC есть три параметра, которые нужно настроить на уровне ноды.

public class DataStorageConfiguration {
    long walForceArchiveTimeout;
    String cdcWalPath;
}

public class DataRegionConfiguration implements Serializable {
    boolean cdcEnabled;
}

Здесь:

  • cdcWalPath — путь к папке, где складываются WAL-сегменты для CDC;

  • cdcEnabled — включает CDC для DataRegion’а;

  • walForceArchiveTimeout — таймаут принудительной архивации сегмента: даже если сегмент заполнен не полностью, по таймауту он будет архивирован и станет доступным для CDC.

С walForceArchiveTimeout есть тонкость. WAL-архив работает быстро за счёт того, что он является memory-mapped file. Это позволяет фактически писать не на диск, а в память для того, чтобы операционная система сбросила файл или мы могли сделать это вручную, когда сегмент будет заполнен.

Запись на диск — дорогая операция, в момент которой производительность ноды снижается, поэтому, с одной стороны, запись нужно делать как можно реже. С другой — CDC узнаёт об изменениях после архивации сегмента, поэтому запись нужно делать как можно чаще. Противоречие :)

Решить его можно, выбирая таймаут согласно требованиям приложения.

Теперь самое интересное — сonsumer, слушатель, который позволяет узнать и обработать изменения:

public interface CdcConsumer {
   public void start(MetricRegistry mreg);
   public boolean onEvents(Iterator<CdcEvent> events);
   public void onTypes(Iterator<BinaryType> types);
   public void onMappings(Iterator<TypeMapping> mappings);
   public void stop();
}
  • start, stop — для инициализации и остановки;

  • onEvents — callback для обработки изменений: вернули true — состояние коммитнулось;

  • onTypes, onMappings — callback’и для обработки изменений метаинформации о типах.  

Что доступно в событии:

public interface CdcEvent extends Serializable {
   public Object key();
   @Nullable public Object value();
   public boolean primary();
   public int partition();
   public CacheEntryVersion version();
   public int cacheId();
}
  • key, value — данные: value может быть null, если событие по remove’у;

  • primary — событие произошло на primary или backup;

  • partition — номер партиции, необходим для распределения нагрузки в соответствии с существующими в Ignite партициями;

  • version — версия entry;

  • cacheId — идентификатор кэша.

Таким образом, у нас есть приложение, которое в асинхронном виде получает уведомления обо всех изменениях всех данных внутри кластера Ignite. Теперь на основе этой функциональности мы можем реализовать как необходимые бизнес-функции, так и логическую репликацию.

Логическая репликация с помощью CDC

Под физической репликацией в данной статье я понимаю перенос между экземплярами БД внутреннего представления данных: страниц памяти и т. д.

Под логической — выделение потока изменений из базы-источника и его воспроизведение в базе-приёмнике.

CDC позволяет реализовать именно логическую репликацию.

В Ignite есть поддержка двух схем: Ignite to Ignite и Ignite to Kafka.

Ignite to Ignite

Внутри Ignite-cdc работает IgniteToIgniteCdcStreamer, кстати, доступный из коробки. Это consumer, который внутри себя поднимает клиентскую ноду Ignite, коннектится к кластеру-приёмнику и, получая изменения, отправляет почти обычную операцию put в кластер-приёмник.

Если кластер-источник недоступен, например из-за упавшей ноды, Ignite-cdc будет вечно ждать, пока нода не запустится. Новые данные не поступят, и процесс обработает те, которые были сгенерированы ещё живой нодой.

Если упал Ignite-cdc, то, во-первых, на всех остальных нодах он будет жив. Во-вторых, через некоторое время операционная система его перезапустит, CDC посмотрит, какие изменения он обработал, и продолжит отправлять их в соседний кластер.

Если потерялся соседний кластер или сетевая связность, Ignite-cdc также упадёт, а после перезапуска снова пойдёт в кластер-приёмник. Если кластер недоступен — падение. Если доступен — отлично, CDC начнёт отправлять в него изменения, которые были накоплены в WAL на диске. Диск является буфером изменений, которые будут копиться до тех пор, пока не получится их обработать и отправить в нужную точку.

Ignite to Kafka

Это вариант репликации для ситуаций, когда кластеры Ignite не видят друг друга, нужно использовать Kafka в качестве транспорта, или если есть несколько читателей событий.

Схема практически такая же: для обработки событий используется стример IgniteToKafkaCdcStreamer. Он раскладывает данные по партициям Kafka в соответствии с партициями Ignite.

На стороне приёмника есть приложение kafka-to-ignite — оно читает данные из Kafka и кладёт их в принимающий кластер Ignite.

Conflict resolver

Подошли к самому интересному: что произойдёт, если один ключ будет изменён на обоих кластерах?

Ответ — сработает conflict resolver. Это интерфейс, который определяет, какие именно данные должны попасть в кластер. Он может взять «старое», «новое» значение или выполнить merge.

СDC-extension предоставляет дефолтную имплементацию, но можно реализовать и свою. При этом стоит отметить, что правильного решения при конфликтах изменений не существует. Не зная ничего о данных, невозможно точно определить, какое изменение правильное, а какое нет.

Ключевые свойства дефолтной имплементации:

  1. Если изменение произошло на «локальном» кластере, оно выигрывает.

  2. Изменения с одного и того же кластера сравниваются по версии. Изменение с большей версией выигрывает.

  3. Если указано поле для сравнения, записи сравниваются по нему.

  4. Если всё предыдущее не сработало, новая запись отбрасывается. Данные разъезжаются, в логах warning, а вам нужно думать, что делать дальше.

Заключение

Внедрение паттерна CDC позволило добавить востребованную функциональность для реализации событийных подписок и создания реплик без влияния на производительность ядра самой базы данных.

Комментарии (0)