Привет! С вами Кабанов Олег — ведущий ML-инженер Flocktory.

В этой статье расскажу об опыте внедрения YDB в качестве хранилища для ML Online Feature Store. А также о том, как нам удалось ускорить загрузку данных в 40 раз и убрать влияние на скорость чтения данных при обновлении.

Предисловие

Наша первоначальная задача была такой: реализовать сервис, возвращающий фичи текущего пользователя по id, latency < 80ms, объём данных ~1TB, возможность гибко масштабировать на чтение и объём хранилища.

На конференции HighLoad++ мы подробно рассказали, почему решили выбрать в качестве хранилища YDB, а в этой статье хочется поделиться опытом оптимизации загрузки, так как это стало проблемой при внедрении.

Структура данных выглядела следующим образом:

CREATE TABLE features
(
    `customer_id` Int32 NOT NULL,
    `tag` Utf8 NOT NULL, -- название фичи: feature_a, feature_b...
    `valid_to` Datetime NOT NULL, -- данные должны пропадать при достижении valid_to
    INDEX idx_customer_id GLOBAL ON (customer_id), -- в запросе получаем все строки по customer_id
    PRIMARY KEY (`customer_id`, `tag`) -- дедупликация данных по двум полям
)
WITH (
    AUTO_PARTITIONING_BY_SIZE = ENABLED,
    AUTO_PARTITIONING_BY_LOAD = ENABLED,
    AUTO_PARTITIONING_PARTITION_SIZE_MB = 512, -- объём партиции при котором она будет разбита на несколько
    TTL = Interval("PT0S") ON `valid_to`
);

Запрос на получение данных упрощенно выглядел так:

DECLARE $customer_id AS int64;
SELECT AGGREGATE_LIST(tc.tag) AS tags 
    FROM features VIEW idx_customer_id as tc -- явно указываем индекс, иначе не будет использоваться
    WHERE tc.customer_id = $customer_id

С появлением все новых источников данных для ML Feature Store объем обновлений рос, время ответа сервиса перестало укладываться в 80ms, а загрузка длилась часами, что было неприемлемо.

Оптимизация 1

Количество партиций достигло 500, при загрузке данных требовались синхронизации между значительным количеством из них. Это вызывало рост издержек на CPU, latency росло, горизонтальное масштабирование не помогало. Мы попробовали снизить количество партиций сначала в два, потом в четыре раза. Вместо типичной загрузки в 1000 записей в секунду получили 4000! Нагрузка на CPU снизилась, но latency продолжал быть нестабильным.

Оптимизация 2

Оптимизация записи в четыре раза вдохновила нас на анализ query explain, и мы заметили, что скорость чтения с индексом idx_customer_id и без него практически не отличается, а данных на тот момент уже было около ~ 300GB. Оказалось, что SSTable хорошо ищет по первой части первичного ключа. Индекс занимал место, и мы решили удалить его. Также отказ от индекса должен был ускорить вставку. Запрос стал выглядеть так:

DECLARE $customer_id AS int64;
SELECT AGGREGATE_LIST(tc.tag) AS tags 
    FROM features as tc -- убрали индекс
    WHERE tc.customer_id = $customer_id

Удаление индекса:

ALTER TABLE features DROP INDEX idx_customer_id;

Каково было наше удивление, когда скорость загрузки выросла до 12 000 записей в секунду! Но скорость чтения во время заливки продолжала показывать спайки на графиках, нарушая SLA.

Оптимизация 3

Возникает вопрос: почему при загрузке дампов скорость записи достигает 50 000 строк в секунду, а в нашей базе данных при заметно меньшей скорости все шарды уже нагружены на 100% по CPU? В чем разница? Почему сокращение шардов так помогло? Мы пишем с помощью YDB Connector for Apache Spark, он пишет батчами по 500 строк, при этом в каждом batch запросе по ключу customer_id мы задеваем все шарды. Структура данных SSTable и шардирование в YDB навело на мысль, что лучше бы писать не во все шарды по 1-2 записи, а в одну все 500, а это проще всего сделать сортировкой данных перед вставкой.

Придумали, реализовали и через час получили скорость записи 40 000 строк в секунду, при этом никакого влияния на скорость чтения, latency стал стабильно < 80ms на 99.9 перцентиле.

Почему так? Дело в том, как данные уложены в партиции SSTable:

image-20250909-075509.png
image-20250909-075509.png

Данные упорядочены по первичному ключу, и если вставляемые строки уже отсортированы, то при большом объёме вставки один батч будет попадать в одну или две партиции. В таком случае транзакция потребует синхронизацию только нескольких партиций, а не всех в таблице. А одной партиции принять 500 строк, да ещё и отсортированных, не составляет труда. Вот почему дампы, снятые с БД, так хорошо загружаются — они уже отсортированы (брались из сортированной таблицы).

Выводы

С тех пор, как YDB была внедрена в Flocktory и мы провели эти оптимизации, прошло уже больше года, скорости загрузки и чтения нам достаточно по сей день, и к проблеме производительности мы более не возвращались. Шаги, которые мы прошли, могут показаться очевидными, но удовольствия от этого мы получили не меньше! Знание внутренней работы БД очень помогает, даже при типичном паттерне применения.

Комментарии (0)