Привет, Хабр! На связи — технические лидеры направления разработки Apache Spark в составе платформы Data Ocean Андрей Первушин и Дмитрий Паршин из Data Sapience. Мы занимаемся решением нетривиальных задач в области Spark-вычислений, некоторые из которых становятся частью конечного продукта.

Сегодня мы расскажем, с какими проблемами можно столкнуться при реализации Upsert Streaming в Iceberg, что такое equality delete, почему они создают нагрузку при чтении таблиц в Apache Iceberg и как мы оптимизировали Apache Spark, чтобы снизить потребление памяти и ускорить чтение данных.

Контекст

В последние годы архитектуры потоковой обработки данных стремительно набирают обороты, особенно в сценариях, где критически важны свежесть и оперативность данных. В нашем продукте мы решили пойти по этому пути: чтобы сократить время доставки данных в хранилище, мы внедрили Flink Upseart Streaming поверх Apache Iceberg — популярный формат для устройства Data Lakehouse, который поддерживает транзакции и эффективную работу с большими объёмами данных и фактически стал стандартом, выиграв «битву открытых форматов».

Одно из ключевых преимуществ Apache Iceberg — поддержка операций UPDATE, DELETE и MERGE без необходимости полного переписывания таблицы (в режиме Merge-on-Read), начиная со второй версии спецификации формата.

На начальных этапах всё выглядело многообещающе: стриминговые задания на Apache Flink стабильно писали данные в Iceberg-таблицы, а процессы обслуживания данных, которые мы реализовали через Spark, используя встроенные процедуры Iceberg, работали. На тестовых средах, с искусственной нагрузкой, система демонстрировала устойчивость и предсказуемое поведение.

Однако реальность в Prod-окружении заказчика оказалась не такой радужной. Спустя некоторое время после запуска мы столкнулись с критической проблемой: таблицы стали практически непригодны для чтения. Spark-процессы обслуживания начали массово падать с ошибками OutOfMemory (OOM), а попытки экспериментально подобрать «достаточное» количество ресурсов для выполнения процедур оказались тщетными — требуемый объем памяти и время выполнения росли нелинейно и непредсказуемо. Более того, в процессе выполнения обслуживания состояние таблицы продолжало ухудшаться: новые commit от Flink накапливали всё больше equality delete-файлов, что усугубляло фрагментацию и замедляло чтение.

Эта статья рассказывает о том, как мы столкнулись с ограничениями реализации Equality Delete в Apache Iceberg и почему критически важно вовремя обслуживать Iceberg-таблицы. Объясняем проблему и рассказываем о нашем решении.

Почему выбрали Spark для запуска процессов обслуживания

Мы выполняем обслуживание Iceberg на Spark по следующим причинам:

  • Реализация Iceberg на Spark включает полный набор процедур для обслуживания и позволяет выполнять все необходимые задачи в отличие от других альтернативных движков нашей платформы;

  • Только Spark имеет функциональность по точечной настройке процедур обслуживания;

  • Spark позволяет гарантированно ограничивать процессы обслуживания заданными ресурсами, не оказывая влияния на другие процессы. 

Процесс обслуживания файлов данных состоит из следующих этапов: 

  • чтение;

  • применение удалений;

  • запись. 

Основные проблемы в Spark (впрочем в других движках так же) возникают именно на этапах чтения и применения удалений. Весь последующий рассказ пойдет именно об этих процессах. 

Что такое equality delete, и почему оно возникает

Iceberg поддерживает два способа построчного удаления:

  • Positional delete — удаление конкретной строки по положению (файл + индекс в файле). Подходит, когда приложение знает физическое расположение строки;

  • Equality delete — удаление по совпадению значений одной или нескольких колонок (например, customer_id = 123). Equality delete удобно использовать при стриминговой записи: мы не знаем, в каком файле находятся старые версии строк, поэтому просто фиксируем значения, которые нужно исключить.

При чтении таблицы Spark обязан объединить data-файлы и delete-файлы, чтобы вернуть актуальный срез данных. Для этого Iceberg читает equality delete files, строит наборы (фильтры) удаленных значений и применяет их к файлам данных по партициям.

Семантики обновления таблицы: CoW vs MoR

  • Copy-on-Write (CoW): при изменении таблицы создаётся новый data‑файл со всеми актуальными данными — старые файлы отпадают.

  • Merge-on-Read (MoR): при добавлении создаётся новый data‑файл; при удалении/обновлении создаются delete-файлы (positional или equality). Актуальное состояние получается при «склеивании» data-файлов с delete-файлами в процессе чтения.

Именно в режиме MoR количество и размер файлов удалений растут, а значит, увеличивается и нагрузка на механизмы их обработки.

Особенности работы кэширования в Iceberg на Spark

Применение удалений в Iceberg на Spark реализовано в момент чтения каждого файла. Для оптимизации чтения таблиц в режиме MoR в Iceberg реализован механизм кеширования файлов удалений, который позволяет снизить нагрузку на S3 и не запрашивать повторно одни и те же файлы.

Формирование фильтра для чтения

Процесс применения equality-delete файлов имеет 4 уровня абстракций:

  1. Сжатые equality-delete файлы на S3;

  2. Сжатые equality-delete файлы в памяти cache;

  3. Множество уникальных значений из актуальных equality-delete файлов для чтения конкретного data-файла;

  4. Фильтры для чтения data-файлов на основе сформированного множества уникальных значений. 

Текущее архитектурное решение требует хранить в памяти и сырые данные, и подготовленные к применению в виде фильтров. Результат: чем больше мы имеем CPU и spark task’ов, тем больше памяти нам необходимо. Т.к. каждый поток формирует свое собственное множество значений и фильтр. 

Например, если мы читаем один большой data-file.parquet, в котором несколько row group. Spark способен читать параллельно в каждом task свою row group. У таких потоков будут вычитаны одинаковые equality-delete файлы и сформированы примерно одинаковые множества значений и фильтров. То есть данные будут дублироваться в памяти.

Процесс создания equality-delete и их применения

Когда мы производим какие-то действия с таблицей с семантикой Merge-on-Read, каждое действие создает свой data-file и equality-delete-file с данными из этого действия. Каждое действие берет для себя следующее значение SequenceNumber. Для получения итогового состояния происходит склеивание всех файлов по определенной логике.

Чтобы получить итоговое значение, необходимо прочитать каждый data-file и применить к нему equality-delete-files со значением Sequence Number большим, чем у него. 

Data-files и equality-delete-files хранятся в формате parquet, который хранит в себе meta-информацию из файла. Это позволяет применять к каждому data-file не все equality-delete файлы с Sequence Number большим, а ограничиться теми, что содержат возможные актуальные значения для чтения этого data-file.

Пример. Чтобы получить текущее состояние:

  • Для чтения первого data-file мы берем все equality-delete файлы, создаем из него Set уникальных значений, из которого формируем фильтр для чтения;

  • Для чтения второго data-file мы формируем фильтр только из последнего equality-delete файла. Предпоследний мы пропускаем, т.к. минимальное и максимальное значение в файле не пересекаются с максимальным и минимальным значением в data-file;

  • Последний data-file мы читаем без фильтров, т.к. нет equality-delete файлов с Sequence Number выше, чем у него.

Жизненный цикл cache

Если рассматривать структуру хранения таблиц в Apache Iceberg формате, то директория с данными о таблице поделена на независимые между собой директории с партициями. При чтении каждую партицию можно воспринимать как независимую от других партиций сущность. Equality-delete файлы в конкретной партиции актуальны только для нее.

С другой стороны жизненный цикл Spark cache ориентирован не по партициям, а по целой таблице. Ключ в cache формируется конкатенацией названия таблицы и пути до equality-delete файла, а в значении — содержимое файла в сыром виде. По пути к файлу можно понять, к какой партиции он относится, но в логике это никак не применяется. Очистка кэша происходит только когда мы закончили работу со всей таблицей. Тогда вызывается invalidate, который по названию таблицы находит все принадлежащие ей ключи и удаляет их из cache.

Другая особенность жизненного цикла — процесс от начала чтения файла до его попадания в cache. Файл попадает в cache только когда вычитан полностью из хранилища S3. Следовательно, возможны случаи, когда несколько Spark task запрашивают один и тот же файл в моменте и происходит несколько загрузок в память. 

Проблемы стандартного кэширования equality delete

При анализе работы с большой партиционированной таблицей нами выявлены три основные проблемы стандартной реализации чтения:

  1. Избыточное потребление памяти при параллельном чтении. Когда множество задач одновременно запрашивает один и тот же delete-файл, каждая задача может загрузить свою копию в память. Одновременно каждая задача формирует свое множество значений для создания фильтра для удаления. Это приводит к кратному увеличению занимаемой памяти на исполнителе и не позволяет делать executor с большим количеством CPU. 

  2. Дублирование данных и лишняя CPU-работа. В кэше содержатся сами equality delete файлы, в которых — устаревшие и дублирующиеся значения. Так как файлы хранятся в памяти в первоначальном виде без сжатия, то занимают больше места. При каждом обращении к этим equality delete файлам их приходится заново преобразовывать в необходимую для Iceberg структуру, что даёт лишнюю нагрузку на CPU.

  3. Неоптимальный жизненный цикл кэша. Equality delete файлы относятся к конкретным партициям. Тем не менее стандартный кэш хранит их на протяжении всего времени обработки таблицы, даже если конкретная партиция уже полностью обработана. В результате память удерживает неактуальные данные.

Разобравшись с проблемами, мы переписали стандартный механизм чтения и применения equality-delete файлов в платформе данных Data Ocean Nova, чтобы устранить описанные выше недостатки.

Наша реализация функциональности чтения и применения файлов equality-delete не публикуется в открытом доступе и поставляется как функционал Managed Iceberg Tables Lakehouse-платформы Data Ocean Nova, поэтому мы не можем поделиться исходным кодом реализации, но сформулировали проблемы и подходы к их решению.

Экспериментальные данные на тестовом стенде

Параметры тестовой таблицы и окружения:

  • Размер файлов таблицы — ~22 ГБ;

  • Сжатие — zstd, lvl 3;

  • Количество партиций — 6;

  • Каждая последующая секция содержит больше удалений, чем предыдущая;

  • Spark 3.5.4, Iceberg 1.8.1.

Параметры executor (варианты): CPU: от 12 до 32; память (ORM): от 140 до 197 GB. 

План эксперимента. Зная оптимальные значения для нашего оптимизированного Data Ocean Nova Cache, попытаться определить, с какими параметрами можно будет запустить ту же команду на обслуживание той же таблицы, но уже с оригинальным Open Source Apache Spark Cache.

Результаты (сравнение):

Кэш

CPU

Память

Время

Статус

1

Data Ocean Nova Cache

32

140 GB

8 мин

Success ✅

2

Apache Spark Cache 

32

197 GB

~2 часа 

Error (OOM) ❌

3

Apache Spark Cache 

20

197 GB

~2 часа 

Error (OOM) ❌

4

Apache Spark Cache 

12

197 GB

~1 час

Success ✅

5

Apache Spark Cache

12

140 GB

~3 часа 

Error (OOM) ❌

Метрики Open Source Apache Spark Cache

Метрики Data Ocean Nova Cache

Выводы по результатам:

  • Наш оптимизированный кэш позволил выполнить задачу при высокой параллельности (32 ядра) и уменьшенной памяти (140 GB) за 8 минут, в то время как стандартный кэш либо падал по OOM, либо требовал сильного снижения параллелизма для успешного завершения;

  • Это подтверждает, что оптимизация по дедупликации и управлению жизненным циклом партиций даёт существенные выигрыши как по памяти, так и по времени выполнения;

  • По метрикам заметно, что оптимизированный кэш способен утилизировать больше CPU за счет большего количества threads. Использование процессорного времени также изменилось в лучшую сторону, разница между самым времязатратным методом и другими уменьшилась. 

Применение в промышленном окружении

Краткое резюме

Для проверки функционала в Prod-окружении были созданы дубликаты 3 потоков данных, выполнена полная инициализирующая загрузка в целевые таблицы для осуществления сравнения производительности и времени обслуживания на идентичных данных.

Параллельно воспроизвели регламент, покрывающий типичные режимы обслуживания (ночные и периодические запуски с разной нагрузкой), и показали устойчивое снижение потребления памяти и ускорение обработки при сохранении/повышении параллелизма в ряде сценариев.

Описание тестовых таблиц

  1. Партицированная по дате

    Ежедневная новая партиция; объём партиции растёт в течение дня. Запуски: ночной (полный цикл), почасовой.

    Параметр

    Apache Spark Cache

    Data Ocean Nova Cache

    Исполнителей

    5

    1

    Ядер (в сумме на кластере)

    10

    9

    Память (executor)

    100 ГБ

    71 ГБ

    Максимальное фактическое потребление

    100 ГБ

    65 ГБ

    Параллельность обработки

    10

    3

    Среднее время выполнения

    ~60 мин

    ~45 мин

    Суммарно (в кластере)

    50 ядер, 500 ГБ

    9 ядер, 71 ГБ

  2. Партицированная по Bucket

    Equality-delete применяется к файлам в каждом bucket — при обслуживании требуется перезапись большого числа файлов.

    Параметр

    Apache Spark Cache

    Data Ocean Nova Cache

    Исполнителей

    5

    1

    Ядер

    5

    27

    Память (executor)

    100 ГБ

    81 ГБ

    Максимальное потребление

    90 ГБ

    70 ГБ

    Параллельность

    10

    7

    Среднее время

    ~21 мин

    ~23 мин

    Суммарно

    25 ядер, 500 ГБ

    27 ядер, 81 ГБ

  3. Без партицирования

    Equality-delete потенциально воздействуют на все data-файлы, что увеличивает объём перезаписи.

    Параметр

    Apache Spark Cache 

    Data Ocean Nova Cache

    Исполнителей

    5

    1

    Ядер

    5

    32

    Память (executor)

    100 ГБ

    188 ГБ

    Максимальное потребление

    95 ГБ

    170 ГБ

    Параллельность

    10

    8

    Среднее время

    ~20 мин

    ~30 мин

    Суммарно

    25 ядер, 500 ГБ

    32 ядер, 188 ГБ

Суммарный эффект на кластере (по трём таблицам)

Потенциально высвобождаемая память — ~1222 ГБ.

Снижение количества pod'ов — ~12 шт.

Освобождаемые CPU-ядра — ~38 шт.

Заключение

Equality-delete — практичный и жизнеспособный вариант реализации удаления/обновления в Apache Iceberg для стриминговых сценариев. Оптимизация (дедупликация, партиционный жизненный цикл, ускоренные проверки принадлежности) решает ключевые проблемы и в боевых проверках показала практическую выгоду: 

  • значительное уменьшение суммарной выделяемой памяти (~1222 ГБ потенциально высвобождается по трём продуктовым таблицам);

  • сокращение количества pod'ов в кластере Kubernetes;

  • возможность увеличивать плотность CPU на executor’ах;

  • сохранение или улучшение времени выполнения в большинстве сценариев. 

На текущий момент наше решение используется в клиентских prod-окружениях для обслуживания всех Iceberg-таблиц как часть функционала данных. Поставляется с  Lakehouse-платформой данных Data Ocean Nova.

Комментарии (0)