Тема преимуществ открытых табличных форматов при работе с озерами данных всё чаще поднимается в среде дата-инженеров. Предполагается, что их использование способно устранить недостатки популярного Apache Hive. Но так ли это на практике?
Меня зовут Иван Биленко, я инженер данных в команде дата-платформы Циан. В этой статье я хочу немного познакомить вас с процессами и стеком внутри нашей платформы, рассказать, почему мы решили попробовать Iceberg, с какими проблемами столкнулись при тестировании и какие преимущества Iceberg может дать тем, кто еще только задумывается о переходе. Дисклеймер: статья носит обзорный характер.
Описание стека и процессов с данными
Циан с точки зрения обработки данных прошел долгий путь. В 2001 году мы были обычной Excel-таблицей.
Сегодня же наша инфраструктура включает в себя DWH на базе Greenplum и Data Lake на основе Yandex S3.
Основные источники данных — это сайт и мобильные приложения, откуда информация поступает в Data Lake через Kafka — наш основной канал передачи данных. После этого данные распределяются по различным потребителям:
Аналитики получают данные через DWH для проведения исследований и анализа.
Дата-сайентисты используют данные из фича-стора для построения моделей машинного обучения.
Есть и другие потребители данных: например, разработчики, а также прочие системы, участвующие в процессе передачи данных. Но на этом этапе мы не будем углубляться в их роль. Остановимся на процессах вокруг Data Lake (на схеме A и B) и связанных задачах.
Боли в процессах и потребности
Процесс A: работа с данными через Kafka
На этом этапе мы обрабатываем более 400 таблиц, загружая и парся данные из Kafka по заранее заданным слоям. У себя внутри мы называем этот процесс раскладкой. Каждую неделю добавляются 2-3 новые таблицы.
Основная задача нашей команды — обеспечить доступность и актуальность данных к назначенному сроку, а также предоставить бизнесу инструмент для самостоятельного управления раскладкой без необходимости привлекать дата-инженеров.
Какие проблемы мы здесь встречаем:
Ограниченные DDL-операции в Apache Hive: Например, у нас нет доступа к операциям с колонками, таким как ALTER TABLE RENAME, REPLACE, или DROP. Это создает сложности, поскольку источники данных меняют свои форматы, и нам приходится перезаписывать таблицы целиком, что усложняет автоматизацию раскладки.
Отсутствие транзакций: Если на этапе перезаписи происходит ошибка, мы можем потерять данные. Чтобы избежать этого, мы используем костыли с временными таблицами, записывая данные сначала в них. Но эти костыли не решают проблему временной недоступности таблиц. Кроме того, нам не удается проводить операции с таблицами одновременно, что сильно замедляет процесс.
Процесс B: работа с фича-стором
Мы собираем около 40 фича-таблиц для нашего фича-стора. Возникает потребность в хранении исторических версий данных. Поскольку речь идет о таблицах, которые могут весить сотни гигабайт, нам нужен эффективный механизм Time Travel, чтобы избежать создания копий данных для разных версий.
Какое решение
В процессе поиска решения на горизонте сразу появились несколько популярных табличных форматов: Delta Lake, Apache Iceberg и Apache Hudi. Все они нацелены на расширение возможностей реляционных баз данных для Data Lake, что должно было полностью покрыть наши запросы. Что касается минусов перехода — на первый взгляд, кроме возможных сложностей с настройкой, их будто бы и нет. Если вы наткнулись на другую информацию, буду рад ссылке в комментариях к статье или в личном сообщении.
При выборе между табличными форматами Delta Lake сразу отпал, поскольку поддерживает только формат Parquet, а полноценное production ready решение с его полным функционалом доступно лишь в коммерческой версии Databricks. Нам же было нужно опенсорсное решение.
Из решений от Apache выбрали Iceberg благодаря его простой настройке — Yandex Cloud предлагает готовый набор файлов jar и подробный мануал, — и активному сообществу.
Сравнение форматов Iceberg и Hive
А теперь возьмём основные фишки Apache Iceberg, изучим, как они должны работать, и проверим их на практике.
Снэпшоты
После каждой операции над таблицей Iceberg фиксирует ее состояние в виде снимка и ведёт лог. Это даёт возможность перемещаться между снэпшотами, делать Time Travel запросы не только по id снимка, но и по заданному тегу.
На практике это оказалось полезным: так можно фиксировать продовые версии датасетов. А вот функция разделения снэпшотов по веткам (например, для экспериментов, чтобы не трогать продовые данные) пока не пригодилась.
Производительность
Netflix, как создатель формата Apache Iceberg, делится впечатляющим кейсом его использования.
Данные
2,7 млн файлов в 2688 партициях
Запрос
SELECT distinct tags['type'] as type FROM iceberg.atlas WHERE name = 'metric-name' AND date > 20180222 AND date <= 20180228 ORDER BY type; |
Результат
EXPLAIN — 9 мин., query — не отрабатывает.
query — 42 сек. при условии min/max filtering.
Почему такой скачок в производительности?
Основной ответ — Iceberg оптимизирует запросы за счёт метаданных, которые позволяют фильтровать файлы, не соответствующие условиям запроса, исключая их из сканирования.
Также Iceberg избавляет от необходимости листинга директорий по каждой партиции, ведь все метаданные хранятся отдельно.
Уровни метаданных
1. Для каждого дата-файла: статистики по колонкам хранятся в manifest files.
2. На уровне снэпшота таблицы: агрегированные статистики в manifest list.
Однако, на практике всё оказалось не так гладко.
Данные
400 Гб, 31 тысяча файлов в 820 партициях, формат Parquet.
Условия
Движок Spark 3.1.2, Iceberg 0.13.1 и фиксированное количество ресурсов.
Запрос
SELECT count(*) FROM iceberg_catalog.feature_store.user_search_history_features |
Результат
EXPLAIN — 5 сек., query — 1 мин. 7 сек.
EXPLAIN — 58 сек., query — 4 мин. 32 сек.
Ожидалось, что подсчёт количества строк займет десятки секунд, но реальность оказалась другой. Возник вопрос: почему так?
Первое предположение: Iceberg не использует метаданные для оптимизации. Логично было бы взять информацию из summary с указанием total-records, но, глядя на план выполнения, видно, что таблица сначала полностью сканируется.
== Physical Plan == * HashAggregate (5) +- Exchange (4) +- * HashAggregate (3) +- * Project (2) +- BatchScan (1) |
Нет отдельного шага, где бы происходило чтение метаданных.
Второе предположение: слишком много метаданных, так как на каждую операцию создается новый файл.
В документации действительно есть раздел Maintenance, где рекомендуется регулярно чистить метаданные. Но какое количество является допустимым для сохранения производительности — вопрос остаётся открытым.
Наличие транзакций
Транзакционная согласованность в Iceberg дает возможность параллельно выполнять операции записи без влияния на операции чтения. Проблема потери данных при перезаписи больше не актуальна. Также ушла ошибка Cannot overwrite a path that is also being read from — теперь не нужно сбрасывать план запроса или создавать временные таблицы, если пайплайн одновременно читает и записывает в одно и то же место.
Эволюция данных
Наша первая боль — ограниченные операции с колонками — решена благодаря возможности эволюции схемы данных без перезаписи. Теперь доступны операции Add, Drop, Rename, Update, Reorder. Возможность менять схему партиционирования (на текущий момент стабильно по дням) и порядка сортировки таблиц пока не использовали, но опция есть.
Расширенный SQL
Операцию DELETE пока не применяли, зато UPDATE используем регулярно в связке с INSERT через MERGE INTO. Важно помнить, что архитектурно в S3 сами файлы не изменяются и не удаляются — создаются дополнительные файлы, которые объединяются при чтении. Со временем это может сказаться на производительности.
Результаты и выводы
Мы начали частичную миграцию на Iceberg при создании нового процесса для сохранения исторических фичей в фича-сторе (процесс B). Нам удалось решить многие из упомянутых болей, но возникло снижение производительности на «жирных» таблицах, которое нужно проработать для полноценного перехода (процесс A). А полноценная миграция — тема для следующей статьи.
Комментарии (9)
eigrad
20.11.2024 20:03Для онлайн запросов по Iceberg-данным у меня часто используется Clickhouse, правда у меня объемы данных в таблицах куда ходит Clickhouse - совсем небольшие, витрины с агрегатами, единицы миллионов записей. Пробовали ли поставить Clickhouse (или вообще какую-нибудь Impala/Presto/Trino) как фронтенд для feature-store?
Про Maintainance - речь же в первую очередь должна быть про "Compact data files", должен быть какой-то регулярный процесс, как бэкграунд мержи в clickhouse. А удаление старых файлов метаданных не должно повлиять на выполнение запросов примерно никак.
de_linch Автор
20.11.2024 20:03В части Maintainance, я бы ещё выделил Expire Snapshots на случай интенсивных операций UPDATE или DELETE, так как со временем может накопиться множество неактуальных файлов, влияющих на производительность, а эта настройка позволяет от них избавиться
eigrad
20.11.2024 20:03Ещё стоит спросить яндексоидов не надо ли вам включить эту штуку при работе с их S3 - https://iceberg.apache.org/docs/1.7.0/aws/?h=write.object+storage.enabled#object-store-file-layout.
eigrad
Вау! Респект что вышли из тени, но работать с Iceberg из Spark 3.1? Да и сам Iceberg кучу родовых травм успел починить по пути к актуальной 1.7.
Пробовали ли
выйти из заморозкиработать на свежей версии?)de_linch Автор
Мы начинали работу с Iceberg в 2022, на тот момент актуальные версии Spark и Iceberg. Возвращая фокус на возможную полноценную миграцию шаг с тестированием на обновлённых версиях впереди)