Недавно я в очередной раз услышал:

“CSV — это популярный формат хранения данных, имеющий встроенную поддержку в Apache Spark…”

Нууу, на счет “популярный” — согласен, “имеющий встроенную поддержку” — согласен, но на счет “хранения данных” — категорически не согласен. Подобные фразы могут не только сбить с толку окружающих, но и привести к значительным непродуктивным затратам времени (и памяти данных). Давайте разберемся.

В этой статье я расскажу о массовом чтении (то есть методе .load() Apache Spark) в рамках Structured API и реализации pyspark (сомневаюсь, что есть отличия в работе в биндингах Scala/Java).

Итак, давайте освежим в памяти несколько общих моментов:

  • spark использует «ленивые вычисления» (lazy evaluation), означающие, что трансформации (transformations) вычисляются только тогда, когда действие (action) требует, чтобы результат был возвращен управляющей программе.

  • .load() — это трансформация, поэтому не следует ожидать какой-либо значительной дисковой активности во время выполнения .load() (значительность является важной, далее мы разберем это чуть подробнее).

  • реальная загрузка данных (имеется в виду процесс чтения с диска) происходит, когда мы выполняем .save() или любое другое действие.

Пока все звучит достаточно безобидно, не так ли?

“…практика как критерий истины...”

Теперь посмотрим, что происходит на практике (для простоты я вставил фрагменты из Jupyter notebook)

ШАГ 1. Выполните простое трансформацию чтения CSV 

Трансформация .load() CSV 
Трансформация .load() CSV 

ШАГ 2. Распечатайте полученную схему датафрейма с помощью .printSchema()

Схема датафрейма, выведенная .printSchema()
Схема датафрейма, выведенная .printSchema()

ШАГ 3. Сохраните датафрейм в файл (сейчас формат не имеет значения - пусть это снова будет CSV)

Действие .save()
Действие .save()

Пока ничего особенного не произошло. Файл маленький, и задержку трудно заметить на первом шаге (во время выполнения .load()). Что ж, нам в помощь есть графический интерфейс Spark, и вот что показывает наше маленькое приложение:

WEB UI Spark после выполнения ШАГА 3
WEB UI Spark после выполнения ШАГА 3

Теперь начинается “магия”... У нас есть 3 джоба, что странно: ведь количество джобов обычно соответствует количеству действий, а у нас было одно действие (.save ()), поэтому мы ожидаем, что у нас будет один джоб. Странно, не правда ли? Чтобы развеять магию, давайте снова пройдем шаг за шагом, и после каждого шага будем наблюдать за Spark GUI.

Выполните первый шаг повторно, и вот как будет выглядеть Spark GUI:

WEB UI Spark после повторного выполнения ШАГА 1
WEB UI Spark после повторного выполнения ШАГА 1

Теперь не может быть никаких сомнений, что появилось два новых джоба (с идентификаторами 3 и 4) без каких-либо действий в нашем коде, следовательно скорее всего это является результатом нашей трансформации .load(). Давайте копнем глубже и посмотрим метрики дискового ввода-вывода этих джобов:

WEB UI Spark с информацией об этапах выполнения

Интересно — файл считывается уже три раза (размер файла 37KB), какую трансформацию мы осуществили.

Давайте продолжим с нашим пошаговым разбором, повторно выполнив printSchema(): новых джобов в Spark WEB UI нет (что вполне естественно).

Повторно выполняем .save():

WEB UI Spark после повторного выполнения .save()
WEB UI Spark после повторного выполнения .save()

Как и предполагалось — еще один новый джоб: было выполнено действие, следовательно мы получили другой джоб. А как насчет метрик дискового ввода-вывода:

WEB UI Spark с информацией о этапах выполнения
WEB UI Spark с информацией о этапах выполнения

Ага, еще одно считывание - мы читали файл уже 4 раза, хорошо, не так ли? Такой простой “конвейер” (загрузка, сохранение) привел к неэффективному вводу-выводу с диска.

Как вам такой формат “хранения данных”? — Мне не очень.

Дело в CSV или, может быть, это у меня какие-то проблемы в конфигурации Spark? Давайте выясним — почему бы нам не повторить те же шаги для другого файлового формата, например возьмем ORC (также имеющий встроенную поддержку Apache Spark).

Я не буду перегружать вас скриншотами и расписывать шаги (шаги не меняются, разница лишь в том, что я загружаю те же данные из файла ORC) - однако результаты будут следующими:

  • нет джобов после трансформации .load() (шаг 1)

  • получаем схему и никаких джобов после выполнения .printSchema() (шаг 2)

  • всего один джоб и одно чтение файла после выполнения .save() (шаг 3)

Итак, мы выяснили, что предыдущее странное поведение связанно с CSV, и что формат ORC ведет себя так, как ожидалось.

И вот мы подошли к сути, которую я хотел выразить этой статьей:

Никогда не храните данные, которые вы планируете обрабатывать с помощью Apache Spark, в формате CSV. Можно использовать CSV в качестве транзитного формата (например, во время операций экспорта или импорта, где это может быть необходимо), для хранения используйте форматы файлов ORC или Parquet.

Я напишу еще одну статью о Parquet и ORC, а пока по своему опыту могу сказать, что они приблизительно на одном уровне (с точки зрения производительности и хранения).

Небольшой дисклеймер и пояснения

Опытные разработчики Spark уже увидели некоторые “несоответствия” в том, что я сказал выше. Позвольте мне немного объяснить, где я “скосил углы”, чтобы лучше выразить суть. Приведенные ниже пояснения не меняют сути дела, но они необходимы, чтобы укрепить “доверие” к сказанному выше.

Вывод схемы

Вывод схемы в Spark платный:

Из документации pyspark
Из документации pyspark

Ожидается, что при использовании вывода схемы (.option (“inferSchema”, “true”)) данные файла будут пройдены единожды. Но почему чтение данных файла произошло дважды (см. скриншоты выше)?

И как Spark удается вывести схему из ORC файла без каких-либо джобов? Это пример “белой магии” в Spark.

Играет ли здесь какую-то роль вывод схемы? — Да, вы можете быть уверены, что вы, например, можете выполнить .sum() на значениях столбцов в случае вывода схемы и CSV, и если данные беспорядочны, Spark будет по умолчанию использовать StringType и не позволит вам суммировать их. Это важно, но очень дорого.

Я потратил дополнительное время и повторил шаги 1–3 для файла CSV без вывода схемы — файл читался всего дважды. Почему? — см. ниже.

Влияние размера файла

Мой файл был относительно небольшим (37KB), что объясняет, почему на шаге 1 (.load()) он был прочитан один раз — здесь всего лишь один первый блок. Для больших файлов — и я их тоже протестировал — поведение то же самое: без вывода схемы .load() создает джоб для чтения только первого блока данных файла.

Я предполагаю, что это необходимо, чтобы узнать количество столбцов в создаваемом .load() датафрейме.

Конец исследования

Эти последние заметки завершают мое небольшое исследование о неэффективности CSV, следите за обновлениями — темы следующих статей: ввод и вывод — важная часть инфраструктуры Apache Spark и очень энергозатратная рабочая рутина любого датаинженера. Ввод/вывод надо делать правильно!


Материал подготовлен в рамках курса «Spark Developer».

Всех желающих приглашаем на открытый урок «Приземление данных с помощью Apache Flink». На вебинаре рассмотрим проблемы чтения и записи данных из Apache Kafka, познакомимся с Apache Flink и посмотрим на стенде, как можно эти проблемы решить.
>> РЕГИСТРАЦИЯ

Комментарии (2)


  1. middle
    21.09.2021 11:51

    Почему лучше НЕ использовать CSV

    Вот так лучше.


  1. x67
    21.09.2021 23:08
    +1

    Исследование без ответа на вопрос хуже пустоты.

    Наблюдения внутри исследования есть, конец исследования есть, а выводов нет. Зачем тогда оно нужно?