Меня зовут Егор Литвиненко. Я старший разработчик Yandex Observability Platform. Летом 2023 года я рассказывал на Saint Highload в Санкт‑Петербурге о нашем пути внедрения YDB CDC для обновления данных в сервисах, чтобы решить проблему инвалидации кешей.

В этой статье будет вся история внедрения с теорией, вопросами, ответами, ошибками, о которых я говорил на выступлении. Но кроме того, в конце есть обновления: что произошло и изменилось за это время. Мы рассмотрим весь процесс от появления задачи до результата:

  • Какие подходы к доставке изменений мы использовали.

  • Почему выбрали переход на CDC и в чём были сложности в работе с изменениями до этого.

  • Чем YDB CDC отличается от других решений, как настроить правильно и на какие грабли мы наступили в процессе.

  • Какую модель данных выбрать, чтобы решить проблемы с конкурентными изменениями.

  • Как поддерживать решение после внедрения.

Управление изменениями данных: описание задачи и подходы к решению 

Yandex Monitoring используют все сервисы внутри Яндекса и Yandex Cloud для внешних пользователей. Ежесекундно обрабатывается 700 миллионов метрик на запись. Это 8 ГБ данных в секунду. Хранится 9 ПБ данных, обрабатывается 8 миллионов алертов пользователей и выполняются расчёты для 200 тысяч различных алертов в секунду. При этом у каждого клиента есть конфигурация процесса загрузки метрик — это Control‑Plane‑объекты, которые хранятся как таблицы в базе данных. Они связаны между собой и меняются на лету. Чем быстрее Monitoring получает обновления объектов, тем выше доступность для пользователей.

Для хранения мы используем YDB — распределённую опенсорс‑базу данных Яндекса. Команды YDB и Yandex Monitoring проработали сценарий загрузки изменений с помощью YDB Change Data Capture (CDC). В результате получили архитектуру, в которой изменения объектов доставляются в тысячи компонентов — менее чем за 800 миллисекунд.

Наши сервисы запущены на тысячах нод. Они используют конфиги из таблиц, логически связанных между собой и образующих иерархию (приоритеты, настройки потока и так далее). Эти таблицы редко меняются: данные в них вносятся вручную с помощью Control Plane. При этом нодам нужно как‑то получать обновлённые данные.

Каким было решение раньше: таблицы были небольшими, и всё содержимое скачивалось целиком в установленные интервалы времени. Сервисы повторяли эти операции регулярно. Такой процесс вычитки распределённой таблицы происходил на каждой ноде с помощью YDB ReadTable (gRPC stream).

Подход был неудобен:

  • Нодам приходилось загружать и обрабатывать все таблицы целиком независимо от того, были ли изменения.

  • Из‑за увеличения нагрузки регулярно возникали лаги доставки, данные в среднем загружались по 2–4 минуты.

  • Приходилось уменьшать частоту обращения к базе данных, чтобы снижать нагрузку.

А когда данные не загружаются, настроек нет, то пользователи на графиках видят пропуски метрик, алерты перестают нормально считаться, и мы получаем инцидент.

С какими данными имеем дело

Сервисы загружают таблицы, которые логически связаны между собой. В YDB нет foreign keys, поэтому валидность связей проверяется внутри сервисов. Все таблицы меняются в разных транзакциях и постоянно растут, потому что приходят новые пользователи.

Такой процесс нужно было оптимизировать, чтобы ноды обрабатывали в таблицах только изменения. Мы рассмотрели несколько вариантов возможного решения проблемы.

Как работать с изменениями данных

Перечислю подходы к работе с изменениями данных в таблицах:

  • Slowly changing dimensions (SCD). Любая база данных.

  • Triggers on tables. Любая база данных с триггерами.

  • Transaction log scanners. Binlog в MySQL®, Replication log в PostgreSQL.

  • Event source. Не зависит от базы данных.

  • Change Data Capture (CDC). YDB, CockroachDB, RethinkDB, Tarantool. Debezium, Databricks Live Table, Zendesk Daemon.

Slowly Changing Dimensions

В базах данных есть семейство подходов Slowly changing dimensions (SCD), которые позволяют управлять изменениями данных. Допустим, у нас есть какая‑то таблица, мы добавляем в неё данные, которые будут хранить изменения в этой таблице. Эти изменения могут быть нескольких типов — от Type0 (неизменяемые данные) до Type6 («комбинированный подход»). Чем больше цифра в типе, тем больше дополнительных данных хранится.

Для SCD Type4 (добавляется таблица с историей) это может выглядеть следующим образом:

Есть таблица истории, сервис из неё читает по определённому интервалу с заданного timestamp, что изменилось. Соответственно для ведения изменений и их быстрого поиска понадобятся дополнительные индексы, триггеры, чтобы обрабатывать операции insert, update, delete. Триггеров в YDB нет, поэтому рассмотрим другой подход — логирование транзакций в БД.

Transaction log scanners

Возможно, вы слышали, что в MySQL® есть Binlog, в PostgreSQL — Replication log, которые регистрируют все изменения данных. С помощью логов можно синхронизировать данные между разными системами или обеспечивать их восстановление.

Для MySQL® Binlog‑файлы хранятся на диске, их можно найти с помощью команды:

Вы можете смотреть лог‑файлы и что в них лежит, но работать с ними напрямую не очень удобно, для этого есть готовые опенсорс‑инструменты, например один из самых популярных сейчас Debezium.

Debezium предполагает наличие MySQL connector, который пересылает данные из Binlog в систему Apache Kafka®, предоставляя вам доступ к потоку изменений данных.

В этом процессе потребуется настроить Kafka®, обеспечить взаимодействие процессов и сконфигурировать Debezium. И, возможно, написать дополнительный код на стороне клиентов для обработки получаемых данных.

Event source

Если отправлять данные в очередь прямо из компонентов, можно назвать такой подход event sourcing. Нода одновременно записывает в базу и генерирует событие в очередь. Каждое изменение в базе данных сопровождается записью события в очередь, и затем мы можем читать и обрабатывать эти события по мере поступления.

В этом случае придётся самим отвечать за целостность между БД и очередью и написать код для операций чтения и записи.

Change Data Capture (CDC)

С одной стороны, CDC — это семейство подходов, которые реализуются через то, что перечислено выше. С другой стороны, иногда эта технология встроена в БД. Например, в YDB.

Чтобы было легче представить, посмотрим на схему:

Вам не надо писать код для продюсера. Вы пишете в БД, а база данных сама перекладывает в очередь, предоставляя вам гарантии. Из очереди вы читаете сообщения об изменениях.

Если сравнить основные подходы к обработке данных, то CDC — наиболее простой вариант, особенно если ваша база данных его поддерживает.

Change Data Capture для YDB

YDB — распределённая база данных, и она поддерживает топики, что для нас важно. Топики YDB — это по сути очереди. Базой данных можно пользоваться отдельно как брокером.

Технология YDB CDC создаёт связку между таблицами в базе данных и очередью сообщений. Это позволяет автоматически отслеживать изменения в таблицах и передавать их в виде событий или сообщений.

Важные свойства:

  • Таблицы и очереди партиционированы одинаково и, соответственно, масштабируются.

  • Все записи в очередь идут асинхронно, поэтому записи в таблицу не тормозят.

  • Есть гарантия, что порядок изменений в очереди такой же, как в базе для primary key.

  • Внутри базы есть Exactly Once между очередью и таблицей.

  • TTL по умолчанию — 24 часа, максимум — 30 дней.

Пример сообщения YDB CDC

Ниже — JSON‑структура, которая позволяет определить, что именно изменилось в базе данных, какие данные были изменены и какой записи это касается:

  • Update — признак, что в базе произошли изменения.

  • NewImage — раздел, содержащий данные об изменениях, например все колонки и их значения.

  • Key — ключ, идентифицирующий конкретную запись в базе, к которой относится изменение.

{
  "update": {},
  "newImage": {
    "createdAt": 1654947225226,
    "updatedAt": 1654947225226,
    "maxResponseSizeBytes": 10485760,
    "state": "RW",
    "maxFileSensors": 1000000,
    "protoQuotas": "KGQwsQI=",
      ...,
    "maxMemSensors": 500000,
  }
  "key": [
    "composite_key_part1",
    "composite_key_part2"
  ]
}

В результате, используя YDB CDC, имеем простую схему получения изменений:

Практика использования YDB CDC

Главное — это то, что в нашей очереди есть только изменения, но нет самой таблицы. Чтобы изменения к чему‑то применить, нужно сначала загрузить таблицу.

Первая загрузка таблицы

Для первой загрузки таблицы в других БД можно использовать select. В YDB есть более специализированный механизм для этой цели, который уже упоминался, — это ReadTable.

В результате схема меняется: нужно на старте сделать ReadTable, а потом накатывать изменения.

Очевидно, может возникнуть ситуация, когда пользователь вносит изменения, а нода выполняет ReadTable. Тем временем пользователи продолжают что‑то писать.

На картинке пользователь делает Update0 в Transaction0, ReadTable1 начинается после — на момент Transaction1. Потом база выполняет Update2 в Transaction2. Считывая информацию из таблицы, ReadTable1 увидит те данные, которые были в таблице на момент начала операции чтения (Transaction1). Все последующие изменения, которые пользователь внёс после начала операции, будут невидимы. Это происходит потому, что операция ReadTable читает данные из снапшота на основе определённой транзакции.

Чтобы учесть изменения, которые произошли после начала операции чтения, надо прочитать их через CDC. Для этого воспользуемся virtual timestamp, которая разворачивается вместе с ReadTable. Её можно получить вместе с сообщениями, если заранее включить соответствующую настройку.

Virtual timestamp — это искусственная временная метка. Пара чисел: coordinator time (время на координаторе транзакций YDB) и transaction id (идентификатор транзакции, в которой произошла операция) — задаёт порядок всех операций в YDB. Это время позволяет отфильтровать и отсортировать данные, чтобы понять, что было последним или самым первым.

Virtual timestamp в сообщении выглядит так:

{
  "update": {},
  "newImage": {
    "createdAt": 1654947225226,
    "updatedAt": 1654947225226,
    "maxResponseSizeBytes": 10485760,
    "state": "RW",
    "maxFileSensors": 1000000,
    "protoQuotas": "KGQwsQI=",
      ...,
    "maxMemSensors": 500000,
  }
  "key": [
    "composite_key_part1",
    "composite_key_part2"
  ]
  "ts": [1670792400, 562949953607163]
}

Добавляется поле «ts» из двух чисел внизу. У операции ReadTable тоже есть один virtual timestamp на все строки таблицы. Таким образом, нам остаётся сравнить virtual timestamp на сообщении и virtual timestamp ReadTable, чтобы определить какая версия строки таблицы новее.

Получение CDC после ReadTable

Нода не может через CDC всё время вычитывать все сообщения. Поэтому чтобы начать читать изменения в очереди, надо указать, с какого момента времени ноде нужны сообщения. Из‑за ограничений клиента у очереди нельзя запросить сообщения по virtual timestamp. Можно использовать обычный unix timestamp. Выбирать его надо с учётом операции ReadTable. Мы берём unix timestamp ноды — тот же, когда нода вызвала ReadTable, — и отнимаем от него 5 минут для запаса.

В результате получается, что CDC будто бы подключился в прошлом — все изменения читаются с запасом. ReadTable скачивает таблицу. А по virtual timestamp нода выбирает, какие изменения были последними.

Ссылочная целостность

Следующая проблема, о которой я уже упоминал: таблицы логически связаны между собой, foreign keys нет, вся валидация связей — на нодах. С точки зрения логики entity relationship‑диаграмма таблиц могла бы выглядеть так:

Но с точки зрения очередей она выглядит так:

Есть четыре параллельные очереди. Поэтому может возникнуть ситуация, когда приходят шард и проект, логически связанные между собой. Только шард приехал на ноду первый, а проекта ещё не существует. И нужно как‑то гарантировать целостность.

Чтобы решить задачу с целостностью, можно поступить следующим образом:

  • Считываем изменения из CDC.

  • На лету валидируем все связи (существует ли проект для шарда в кеше).

  • Если состояние не валидно, не используем сущность и ждём, когда придёт следующее сообщение и сущность поменяет состояние.

  • Если не дождались — перечитываем запросом.

Такой подход относительно прост в реализации, потому что всю логику по валидации можно скрыть в кеше и отдавать наружу только валидные сущности.

Было это ещё в конце 2022-го — начале 2023 года. На этом этапе были разрешены все практичные вопросы и я пошёл имплементить. Спустя примерно три месяца деплоится новая версия наших сервисов c CDC, идём смотреть тайминги, и видим, что всё стало ещё хуже.

Проблемы внедрения и настройки CDC 

Почему так произошло? Вспомним схему: один раз записал, какая-то нода читает один раз. 

А если нод, например, 1000, то в 1000 раз умножается трафик.

Наш сценарий:

  • Редко обновляем данные.

  • Подписываем на очередь 1500 подписчиков.

У брокеров сообщений есть свои лимиты и настройки. Это одна из причин, почему часто в подобных архитектурах используются промежуточные звенья: распределённые кеши, configuration services, или зеркалируется трафик. Посмотрим на лимиты YDB.

Лимиты YDB CDC по умолчанию

В YDB существуют стандартные ограничения для CDC:

  • Максимальная скорость для одного подписчика — 2 МБ данных в секунду.

  • Максимальное количество подписчиков для одной очереди — 10 (а нам надо 1500 ?).

  • Максимальное количество очередей для одной таблицы — 5.

Суммарно максимальная пропускная способность для одной таблицы составляет 100 МБ в секунду.

Лимиты можно менять. Но когда мы внедряли CDC, команда YDB только разрабатывала эту функциональность. Никто не ожидал, что мы сделаем 1500 подписчиков на очередь...

Лаг доставки — 4 минуты 

Представьте, что вы потратили пару месяцев на то, чтобы всё сделать, запустили на продакшне и получили вместо 2–3 минут, которые уже были чересчур, — 4,5 минуты.

Сначала я подумал, что проблема в ReadTable, на нём такие тайминги могут быть, когда таблица перегружена, как у нас. Поэтому нормально было ожидать большой сотый персентиль. Но здесь все персентили были «в полку» .

В поисках решения мы смотрели на лаги доставки в очереди YDB и загрузку. Когда мы подписали 1500 подписчиков на очередь, где ожидалось не больше 10, весь CPU тратился на подсчёт метрик подписчиков. К счастью, это быстро поправили в Upstream, и тайминги значительно улучшились.

Другая причина плохих таймингов, которая встречается на практике, — тяжёлые SQL‑запросы. Так как ресурсы для пользовательских запросов и CDC сообщений нужны одни и те же, всё может начать тормозить, если не хватает UserPool — трэд‑пула, в котором выполняются пользовательские запросы и отправка CDC‑сообщений. Эта проблема решается следующими способами:

  1. Оптимизировать запросы.

  2. Партиционировать таблицу и добавлять ресурсы.

  3. Балансировка таблеток.

Последний пункт — это о балансировке таблеток YDB. У нас своя инсталляция YDB в железе. И часто таблетки YDB живут по соседству с другими нашими сервисами. Релизы YDB приводят к тому, что таблетки перебалансируются неоптимальным образом, потому что балансировщик ничего не знает про overcommit со стороны других сервисов. Это портит тайминги, но это отдельный вопрос, который мы будем решать расселением контейнеров.

Перейдём к следующей проблеме.

Массовый рестарт

Неважно, YDB ли у вас или другая БД, вы проектируете CDC. Представьте, что вы впервые запустили кластер и начинаете первую загрузку таблиц на нодах.

Операция долгая, если кластер перегружен, начнутся таймауты, и кластер не сможет подняться. Поэтому нужно проектировать систему так, чтобы было понятно, как работать с CDC в случае возникновения проблем.

Первое, что, очевидно, можно добавить, — это механизм повторных попыток (retry с backoff). Если ваша база данных справляется с нагрузкой, то в конечном итоге кластер успешно поднимется после нескольких попыток. Если это не помогает, нужно рассмотреть ограничение количества нод, загружающих таблицы одновременно. Это можно сделать с помощью распределённого семафора или с использованием Continuous Deployment ограничить количество выкатываемых нод.

Рестарт сервиса 

Представьте, что вы перезапустили ноду, и всё висит. Ошибок нет, но и сообщений тоже нет.

Чтобы понять, что происходит, важно разобраться, как работает балансировщик чтений данных (Read load balancer, RLB) из очередей в YDB.

Когда нода хочет прочитать очередь, она идёт в балансировщик, записывается, что есть такой‑то подписчик с таким‑то именем, который хочет читать определённые партиции.

Балансировщик запоминает подписчика по имени и говорит: «Хорошо, читай». После этого начинается чтение.

Если по какой‑то причине вы принудительно завершили работу сервиса, например, с помощью kill -9, или нода аварийно упала, сессия не закроется, а информация о ней останется на балансировщике. Когда нода восстановится, она пойдёт регистрироваться на балансировщик снова. У балансировщика появится два одинаковых подписчика с одним и тем же именем.

Для балансировщика топиков в YDB это нормальный сценарий. Он должен распределить, какие партиции назначить каждой сессии. Если обе сессии хотят прочитать одни и те же партиции, балансировщик случайным образом выбирает, кому что достанется. Если повезёт, то это ваша новая сессия, и вы получите данные. Если не повезёт, то можете «зависнуть» на несколько минут, пока старая запись о сессии не будет удалена из балансировщика. И только после этого получите свои данные.

Мы не можем себе позволить висеть несколько минут, поэтому в качестве решения важно настроить параметры keep alive и idle timeout для клиента GRPC. Это поможет активно управлять сессиями, избавляться от старых и предотвращать долгие задержки.

Методы чтения 

В результате у нас оказалось три разных метода чтения данных из базы: Select, ReadTable и CDC.

Проблема в том, что форматы данных, которые получаем, тоже разные. Для Select/ReadTable — это Protobuf, а для CDC — JSON. Нет универсальной библиотеки, которая позволяла бы всё это прочитать. Кроме того, иногда кто‑то может забыть добавить необходимые колонки, что приведёт к расхождению в данных. В качестве решения можно использовать Round‑trip tests. У нас это реализовано следующим образом:

  • YDB локально поднимается в тесте.

  • Создаются таблицы и идёт запись сущностей.

  • Выполняется чтение данных через CDC и другими способами.

  • Проверяется, что все поля заполнились корректно.

Также нужно стараться:

  • Унифицировать код, читающий данные.

  • Иметь флаг (рубильник), включающий или выключающий какой‑то из методов чтения.

Когда всё плохо, нужен механизм, который синхронизирует все ваши ноды и дата‑центры. Расхождения данных могут возникнуть из‑за различных факторов, таких как ошибки в коде при выпуске новых версий, потеря сетевого соединения и другие непредвиденные события, которые могут повредить данные. У нас для этого есть флаг, который в любой момент включает ReadTable потаблично.

Чтобы находить расхождения в кластере, его нужно мониторить.

Как мониторить расхождения

Смотрим метрики сервиса:

  • Задержка доставки.

  • Время последнего события (cross dc, cross nodes) — сравниваем время последних событий на разных нодах, чтобы находить отставшие ноды.

  • Объёмы чтения и спайки — следим за пропускной способностью кластера.

  • Ошибки обработки.

Ещё полезно отслеживать стандартные метрики YDB:

Вот как выглядит дашборд нашего мониторинга:

Апдейты

На выступлении я обозначил такие планы:

  1. Перенести ещё больше таблиц на использование CDC.

  2. Протестировать CDC initial table scan вместо ReadTable. Это означает, что можно получить все данные полностью через CDC.

  3. Использовать YDB SDK v2 вместо самописного клиента.

  4. В самом CDC планировали: обеспечить консистентность между разными очередями, что поможет сохранить целостность транзакций и внедрить эфемерных подписчиков, которые помогут при перезапусках нод.

По четвёртому пункту пока нет апдейта, а про остальные расскажу по порядку.

Перевозим таблицы на использование CDC

Тут апдейт короткий: перевезли и перевозим таблицы на YDB CDC, потому что это схематично экономит ресурсы.

Протестировали CDC initial table scan

Нам не подошло. Initial scan включается один раз в настройках очереди. Если включен, то скачивается вся таблица целиком при каждом подключении к очереди. Этот сценарий подходит для репликации. А если вы просто потеряли соединение, то перечитывать все изменения слишком дорого.

Миграция на Java YDB SDK v2

На выступлении рассказывал, что мы живем на Java, и используем самописный клиент для YDB поверх GRPC‑протокола. Приходилось его поддерживать самостоятельно, вносить правки. Зимой 2024 переехали на новый клиент Java YDB SDK v2, и это улучшило тайминги. Также Java YDB SDK v2 позволяет тестировать сценарии в Docker‑контейнерах. Джависты, кто знает test‑containers, поймут!

Результаты и выводы

В результате работы над сценарием загрузки изменений с помощью YDB CDC, мы получили архитектуру, которая позволила отслеживать изменения в данных, обеспечила быструю доставку конфигураций и улучшила доступность мониторинга для пользователей.

Подведём итоги того, что получилось.

  1. Изменился лаг доставки

    На данный момент 50 перцентилей изменений доставляются всего за 50–70 миллисекунд, вместо 2–4 минут. В целом, все изменения доходят до нод в течение сотен миллисекунд.

  2. Снизилась нагрузка на CPU

    Дополнительным бонусом CPU на базе данных упал в два раза, потому что она использует меньше ресурсов процессора.

  3. Результаты внедрения CDC в цифрах (нагрузка на одну очередь)

На момент доклада

Сейчас

Подписчиков на очередь

1530

2416

Сообщений в секунду

6 тысяч

6–12 тысяч, в пике до 15 тысяч

Средний трафик

80 МБ, в пиках — до 200 МБ

180 МБ, в пиках — до 350 МБ

99-й перцентиль задержки в доставке данных

800 мс

500 мс

При этом в два раза упала загрузка ядер CPU, а суммарный трафик по всем очередям и подписчикам в пиках достигает больше 2 ГБ в секунду.

Что в планах дальше:

  1. Из‑за того что в нашем кластере контейнеры YDB и других сервисов живут на одних хостах и позволяют overcommit, часто тайминги просаживаются после релизов YDB или неудачных балансировок. Планируется расселить YDB и наши сервисы, и убрать перерасход ресурсов в контейнерах.

  2. Изменения накатываются на кеш по ключу — меняется значение. А после этого мы перестраиваем в Java целиком снапшот кеша. Эта история больше про Java, чем про YDB CDC. Нам необходимо было так делать из‑за ограничений, как мы используем конфиги. Но теперь в ближайших планах остаться на одном кеше и убрать построение снапшотов, которое потребляет CPU.

Комментарии (1)


  1. vinni
    23.03.2024 12:12
    +1

    Статья огонь, спасибо за детали и за труд!