Таблицы — это фундаментальная часть заданий Spark, и при изучении документации кажется, что работать с ними нетрудно. На самом же деле опасности поджидают на каждом повороте. Команда VK Cloud перевела статью о том, с какими трудностями вы можете столкнуться и как их преодолеть.

Если вы делаете новый проект на основе Spark, то лучше использовать версию 3.2, а если готовы слушать чужие советы в интернете, рекомендую Delta 2.0. Хотя эта версия вышла недавно, это надежное решение, которое позволит на 90 % быстрее достигнуть цели. Если у вас сложный проект или вы интересуетесь подробностями, эта статья для вас. Обсудим, как безопасно перезаписать готовую таблицу, в том числе партиционированную, как использовать Magic committer, чтобы не делать дорогие копии объектов, а еще посмотрим, какие проблемы решают табличные форматы нового поколения.

Анатомия таблицы и опасности за углом


Таблица — это удобный и стабильный идентификатор для громоздкого, потенциально нестабильного пути, по которому расположено множество файлов данных. Вместо s3://acme-data-mart/fact-tables/orders/v3 можно использовать  mart.orders. Эти идентификаторы обычно хранятся в метахранилище Hive, даже если это единственная часть Hive, которую вы используете. Кроме того, таблицы содержат схемы — и совместимые с Hive, и более подробные схемы Spark.

Для наших целей очень важно, чтобы Spark считал все файлы, расположенные по указанному пути, частью данных таблицы. В партиционированной таблице каждое сочетание значений разделов имеет связанный с ней путь, например, раздел с заданной датой может быть путем   s3://acme-data-mart/fact-tables/orders/v3/date=2022-01-05/. Путь раздела можно изменить, даже используя другой бакет. Но это все же путь, и все файлы, расположенные в нем, безоговорочно остаются частью данных таблицы.

В первый раз создать таблицу легко: записываем данные и регистрируем идентификатор в метахранилище. Так как мы еще никому не рассказали об этой таблице, ни один модуль чтения не наткнется на нее. Проблемы начинают расти, как снежный ком, когда таблица применяется во множестве действий и используется интерактивными пользователями, а нам в это время нужно ее обновить — скажем, добавить новые строки или исправить строки с ошибками. 

Что может пойти не так? Если вспомнить основные принципы работы этой системы, можно представить несколько вариантов:

  • средства чтения видят частично записанные новые данные;
  • средства чтения видят дубликаты данных, когда в таблице есть и новые, исправленные, и старые строки с ошибками;
  • в худшем случае модуль записи может дать сбой в разгар работы, так что указанные выше несоответствия исчезнут только после перезапуска задания.

В соответствии с терминологией ACID, облачные хранилища (Warehouses) обычно предлагают устойчивость автоматически, не заботятся о согласованности и изначально имеют серьезные проблемы с атомарностью и изоляцией. Вот об этом мы и поговорим.

Давайте вкратце обсудим, в чем разница между дополнением и перезаписью. 

Когда мы дополняем таблицу, мы добавляем в нее новые файлы с новыми данными, не трогая уже имеющиеся строки. Разобраться в содержимом предстоит тем, кто все это будет читать. 

При перезаписи мы определяем новое содержимое таблицы или некоторых ее разделов. Мы можем сочетать новые данные с имеющимися, убирать дубликаты и применять любые семантические изменения. В некоторых ситуациях дополнение оказывается целесообразным. Все же операции чтения встречаются чаще, чем операции записи, так что обеспечить согласованность данных, пусть даже потратив дополнительные усилия при записи, — это разумно. В связи с этим мы будем больше говорить о сценариях перезаписи.

Перезаписываем таблицу


Самый простой способ — это полностью перезаписать таблицу, что можно сделать с помощью следующего кода:

df.write.mode("overwrite")
  .option("path", "s3://bucket/dim_user")
  .saveAsTable("mart.dim_user")

Вот и первые подводные камни:

  • сначала таблица полностью удаляется;
  • потом данные записываются в S3;
  • наконец, таблица создается заново, указывая на эти данные.

Любое задание или интерактивный запрос, который выполняется в этот момент, скорее всего, выполнить не удастся, потому что таблица не существует. А если по какой-то причине записать таблицу не удалось, мы останемся вообще без нее.

Spark не предлагает никакого нативного решения, но можно разобраться самостоятельно:

  1. Запишите данные в новую таблицу с новым путем. Например, добавьте случайный суффикс к имени и пути имеющейся таблицы.
  2. Когда таблица записана, используйте команду SQL alter table, разместив имеющуюся таблицу по новому пути к данным.
  3. Если все удалось, удалите старую таблицу или запланируйте ее удаление на потом. Если что-то пошло не так, удалите временную таблицу и ее данные.

Поскольку alter table — это сугубо операция с метаданными, то время, в течение которого невозможно завершить параллельные с ней операции, сокращается до нескольких миллисекунд. Как правило, это приемлемая продолжительность.

Перезаписываем разделы


Большие таблицы часто делят на разделы по столбцам, например по дате или стране, чтобы в них можно было заменять только один раздел. Скажем, можно собрать данные за один день и записать их следующим образом:

df.write.mode("overwrite")
   .option("path", "s3://bucket/table")
   .partitionBy("date")
   .saveAsTable("mart.orders")

К сожалению, этот код работает так же, как и в примере с таблицей без разделов: для начала он удалит всю таблицу со всеми историческими разделами. Чтобы этого избежать, нужно использовать параметр конфигурации

.config("spark.sql.sources.partitionOverwriteMode", "dynamic").

Таким образом, Spark удалит раздел, только если в нашем датафрейме есть его данные. Но нам не известно, какие значения присутствуют в датафрейме. Следовательно, Spark сначала запишет данные в область промежуточного хранения, разделяя их на разделы. Потом он определит, какие значения разделов присутствуют, удалит в них имеющиеся данные и скопирует новые.

Фундаментальная проблема подхода в том, что это не атомарная операция. Если параллельно выполняется операция чтения, может возникнуть ситуация, когда данные раздела уже удалены, а новые еще не записаны или записаны не полностью.

Во всех версиях Spark до 3.1.0 был еще один серьезный баг: механизм динамической перезаписи не умел восстанавливаться после любого сбоя при выполнении задачи. Обычно если один исполнитель прекращает работать по временным причинам, Spark пытается повторно выполнить соответствующие задачи и завершить задание. Из-за бага все эти попытки заканчиваются ничем, пока Spark не сдастся и не прекратит операцию записи. Баг в конечном счете исправили, но он остается напоминанием о том, что динамическая перезапись — вещь очень хрупкая.

К счастью, мы можем прибегнуть к тому же трюку, которым спасались, когда перезаписывали таблицу целиком:

  1. Записываем новые данные во временную таблицу, обязательно выбираем режим перезаписи раздела Static.
  2. Перечисляем записанные разделы и дополнительно копируем данные в расположение таблицы. Если скопировать данные в шаблон партиционированной таблицы по умолчанию, Spark сможет восстановить метаданные в случае их потери.
  3. Для существующих разделов модифицируем метаданные таблицы, указывая новые расположения, а для новых разделов создаем новую запись в метаданных.
  4. Удаляем данные старого раздела или планируем удаление на попозже.

В нашем конкретном случае полная реализация занимает примерно 300 строк кода и фактически снимает все проблемы надежности, характерные для стандартных методов записи таблиц Spark. Но тут встает вопрос производительности.

Выполняем коммит данных


Представьте, что вы вернулись в январь 2008 года, когда iPhone 1 версии 2G и HDFS были крутыми новинками. Вы используете MapReduce для обработки и записи данных, но вам все равно нужно, чтобы эти данные появились в папке назначения атомарным образом. Для этого придется действовать в два этапа:

  1. Каждая задача записывает данные во временную папку, созданную специально для этого. После завершения задачи ее временная папка атомарным образом перемещается во временную папку задания с помощью операции переименования.
  2. После выполнения всех задач временная папка задания переименовывается в окончательную выходную папку.

Поскольку в HDFS операция переименования — известная под названием Hadoop Output Committer version 1 — выполняется атомарно, такой подход можно считать достаточно надежным. 

Теперь вернемся в современность. Использование таблиц Spark с динамической перезаписью разделов добавляет дополнительный раунд копирования. При использовании S3 атомарное переименование папок не происходит: приходится перемещать каждый объект, а это, по сути, копирование каждого байта в пункт назначения и удаление оригинала. При использовании GCS перемещение объекта, в теории, представляет собой операцию только с метаданными, байты содержимого при этом не копируются. На практике же никакого прироста скорости не наблюдается, все это до сих пор работает очень медленно. Так что у нас получается три раунда копирования каждого байта данных.

Притом что последовательное копирование объектов не дает нам атомарную семантику, сначала оптимизация заключалась в объединении временной папки задачи с временной папкой задания. В результате получился Hadoop Output Committer version 2, который является стандартной рекомендацией для Spark с облачным хранилищем. Со стороны Spark точкой входа для механизма фиксации является класс HadoopMapReduceCommitProtocol, у которого есть довольно длинный комментарий, описывающий эти перемещения данных.

Потом сообщество Hadoop придумало долгосрочное решение получше, создав два коммитера нового поколения — Staging и Magic.

Staging committer


Staging committer записывает результаты задачи на локальный диск исполнителей и некоторые метаданные в общее расположение HDFS. После завершения всех задач данные передаются из локальных дисков в облачное хранилище, непосредственно в окончательное расположение.

Поскольку общее расположение HDFS содержит список всех путей, которые мы планируем записать, Staging committer может сам внедрить логику динамической перезаписи разделов. Он может перечислить все разделы, в которые мы пытаемся выполнить запись, и автоматически удалить существующие данные в этих разделах.

Поддержка динамической перезаписи раздела — однозначное преимущество этого коммиттера. С другой стороны, вам нужно дополнительное место на локальном диске. Фиксация не является атомарной, а если какой-то исполнитель потеряется, потеряются и результаты задачи на локальном диске, и задачу нужно будет повторить. Наконец, нужно поддерживать HDFS просто для целей координации.

Magic committer


Magic committer использует механизм S3, который называется Multi-part upload. Часто при передаче большого объекта оказывается удобно выгрузить его по частям, а потом сообщить S3, что все части переданы. Только после этого объект становится видимым. Magic committer использует этот механизм, чтобы добиться атомарной фиксации в мире Hadoop/Spark.

Каждая задача начинает операцию передачи по частям с указанием окончательного расположения объекта. После завершения фиксируются все операции выгрузки индивидуальных задач.

Похоже, это решение самое быстрое: данные здесь вообще не копируются, а в HDFS не хранятся дополнительные метаданные. Но из-за отсутствия метаданных невозможно выполнить динамическую перезапись раздела. Нам просто не хватает информации, чтобы знать, какие файлы мы записываем.

Следует отметить, что первоначально Magic committer требовал дополнительный компонент S3Guard. Операции с метаданными S3 не были идеально согласованными: список объектов не всегда показывал только что добавленный объект. Такая несогласованность выводила из строя Magic committer, а S3Guard стал решением, где используется дополнительная таблица DynamoDB, в которой отслеживаются только что добавленные объекты. В 2022 году в этом уже нет необходимости: сейчас S3 — полностью согласованное решение.

Spark и новые коммиттеры


Новые коммиттеры стали очень важным усовершенствованием, но, к сожалению, интерфейсы Hadoop ограничивали их функциональность. В частности, у нас есть один коммиттер, который справляется с динамической перезаписью разделов, еще один, который с ней не справляется, и их общий интерфейс Hadoop, который не способен определить разницу. В этом интерфейсе нельзя проверить или отправить запрос на динамическую работу с разделом. В результате Spark перестраховывается и завершает работу с ошибкой, если вы пытаетесь использовать динамическую перезапись разделов с новыми коммиттерами.

Что здесь можно сделать? Объединить прежний подход передачи метаданных таблицы с новыми коммиттерами:

  • без всякого копирования с помощью Magic committer записываем новые данные во временную таблицу;
  • находим разделы во временной таблице, копируем их в окончательную и используем операции с метаданными, чтобы указать разделы для новых данных.

Этот подход включает одно ненужное копирование, но он достаточно надежен и в нем копирования все-таки меньше, чем в подходе Spark по умолчанию.

Табличные форматы нового поколения


Решение с Magic committer, временными таблицами и переключением путей к разделам стало для нас палочкой-выручалочкой. И все же это весьма нетривиальная критически важная настройка, о которой мы на самом деле предпочли бы не думать. Если вернуться к дискуссии о таблицах Spark/Hive, можно вспомнить один момент, с которым возникает больше всего проблем. Это идея, что мы указываем расположение таблицы или раздела, и при этом каждый файл в этом расположении является частью таблицы.

Именно по этой причине мы не можем запустить 100 задач, каждая из которых записывает файл в окончательное место назначения. Как только одна из них завершается, все средства чтения будут считать частью таблицы тот самый один файл с 1/100 всех данных. Есть несколько новых табличных форматов, в которых этот аспект пересмотрели, а таблица однозначно перечисляет файлы данных.

Допустим, нам нужно перезаписать таблицу. У нас 100 файлов и файл манифеста, в котором они перечислены. Можно записать данные непосредственно в то же расположение. Мы не обновляем файл манифеста, так что параллельные средства чтения будут и дальше без проблем использовать 100 старых файлов данных. После этого мы обновляем манифест, в котором теперь содержится список 100 новых файлов. Коль скоро мы можем атомарным образом обновлять этот единый файл манифеста — а это относительно просто, — все параллельные средства чтения будут использовать либо старые, либо новые файлы. Ни при каких обстоятельствах не возникает ситуация чтения несогласованных данных.

Для чего нам использовать файл манифеста вместо дополнительной базы метаданных? Все сводится к удобству работы и разработки. Конечно, можно модифицировать хранилище метаданных Hive, чтобы получить список файлов, но тогда любые изменения табличного формата придется координировать с графиком релизов Hive. Список файлов можно было бы хранить в отдельной базе данных, но это дополнительные трудозатраты. Если хранить все метаданные в файле рядом с самими данными, таблица становится самодостаточной — если знать путь S3.

Delta и Iceberg


Два самых популярных табличных формата нового поколения — это Delta и Iceberg. Концептуально они очень похожи, и выбор одного из двух вариантов скорее дело вкуса или специфических сценариев использования, например бакетинга (который поддерживается только в Iceberg) или Z-ordering (поддерживается только в Delta).

Их время от времени пытаются сравнивать, в том числе по производительности, но я бы не стал заниматься этим всерьез. Сравнение производительности не так просто сделать даже на собственном кластере и рабочих нагрузках, а когда другие люди проводят «эталонное отраслевое тестирование», часто оказывается, что его результаты вообще не информативны для ваших наиболее ресурсоемких заданий.

Я лично работаю с Delta, и это решение весьма продвинулось в последние пару лет:

  1. Со Spark 2 его было не очень удобно использовать. Даже для записи партиционированной таблицы требовалась слишком уж ювелирная работа.
  2. Со Spark 3.0 им уже можно было пользоваться, и мы перешли на него для выполнения наиболее ресурсоемкого задания. Но все еще встречались случаи, когда для использования Delta приходилось вносить изменения в код.
  3. Начиная с версии Spark 3.2.1 и Delta 2.0.0, описанная выше динамическая перезапись разделов работает так же, как в классических таблицах Spark.

Итак, к последнему пункту я все это и вел. Теперь можно написать:

spark.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
df.write
    .format("delta")
    .partitionBy("date")
    .option("path", "s3://bucket/table")
    .saveAsTable("mart.orders")

В результате заменяются разделы для дат, присутствующих в df, а все другие разделы остаются без изменений. Что здесь важно? Поскольку данные записывают непосредственно в указанные пути, не нужно думать о пользовательских коммиттерах: у Delta есть собственный коммиттер, который прекрасно работает.

Заключение


В этой статье мы рассказали, что таблицы Spark не так уж просты, надежны и быстры, как это кажется на первый взгляд.

Если вы начинаете новый проект и используете Spark версии 3.2.1 или более поздней, я бы рекомендовал начинать с Delta 2.0. Она решает практически все проблемы с надежностью и производительностью, не требует настройки каких-либо коммиттеров и в целом закрывает 90% ваших потребностей. Iceberg тоже вполне разумный выбор, но я бы посоветовал сначала поговорить с теми, кто использовал его в продакшен-условиях.

Если вы не можете использовать ни Delta, ни Iceberg, вот что я посоветую:

  1. Напишите собственный код, чтобы в атомарном режиме обновлять таблицу через операции с метаданными. Возможно, на это уйдет неделя, но это избавит вас от сильной головной боли в дальнейшем.
  2. Используйте Magic committer, чтобы сократить копирование данных в S3.

Команда VK Cloud развивает собственные Big Data-решения. Будем признательны, если вы их протестируете и дадите обратную связь. Для тестирования пользователям при регистрации начисляем 3000 бонусных рублей.

Что еще почитать:

Комментарии (0)