Для будущих студентов курсов "Data Engineer" и "Экосистема Hadoop, Spark, Hive" подготовили еще один перевод полезной статьи.


Criteo — это компания, работа которой основана на данных. Каждый день через наши системы проходят десятки терабайт новых данных для обучения моделей рекомендаций, обрабатывающих запросы в масштабах всего Интернета. Spark — наше основное средство обработки больших данных. Это мощный и гибкий инструмент, однако он отличается довольно высокой сложностью в освоении, а чтобы пользоваться им эффективно, зачастую требуется читать исходный код платформы.

Быстрая обработка больших данных имеет критическое значение для нашего бизнеса:

  • мы часто обновляем наши модели, повышая их производительность для наших клиентов;

  • мы быстро выводим новые продукты на базе машинного обучения на рынок за счет того, что можем быстро выполнять итерации;

  • от скорости обработки данных зависят затраты на инфраструктуру.

В этой статье я расскажу о написании эффективного кода Spark и на примерах продемонстрирую распространенные подводные камни. Я покажу, что в большинстве случаев Spark SQL (Datasets) следует отдавать предпочтение перед Spark Core API (RDD), и если сделать правильный выбор, можно повысить производительность обработки больших данных в 2–10 раз, а это очень значимо.

Конфигурация для экспериментов

Spark 2.4.6, Macbook Pro 2017 с процессором Intel Core i7 с частотой 3,5 ГГц

Измерения всегда производятся на разогретой виртуальной Java-машине (выполняется 100 прогонов кода, и берется среднее значение за последние 90 прогонов). Приведенный в этой статье код написан на Scala, но ее выводы должны быть справедливыми и для Python.

Заблуждения, связанные с обработкой больших данных

Существует распространенное мнение, что в процессах обработки больших данных есть два основных узких места, влияющих на производительность:

  • перетасовка данных, поскольку для ее выполнения требуется отправлять данные по сети;

  • дисковый ввод-вывод, поскольку доступ к данным на диске всегда намного медленнее, чем доступ к данным в ОЗУ.

Эти представления имеют под собой исторические основания — в 2006 году, когда впервые появилась библиотека Hadoop, обычные жесткие диски были медленными и ненадежными, а основной платформой для обработки больших данных была MapReduce. Именно медленная работа жестких дисков и подстегнула разработку таких средств обработки в памяти, как Spark. С того времени характеристики аппаратного обеспечения значительно улучшились.

В 2015 году в исследовании Кей Остерхаут (Kay Ousterhout) и др.? были проанализированы узкие места в заданиях Spark, и в результате выяснилось, что скорость их выполнения в большей степени определяется операциями, загружающими ЦП, а не вводом-выводом и передачей данных по сети. В частности, авторами этой научной работы был выполнен широкий спектр запросов к трем тестовым наборам данных, включая TPC-DS?, и было определено, что:

  • если бы пропускная способность сети была безграничной, время выполнения заданий можно было бы сократить на 2 % (медианное значение);

  • если бы пропускная способность дискового ввода-вывода была безграничной, время выполнения стандартного аналитического процесса можно было бы сократить на 19 % (медианное значение).

Весьма неожиданный результат! Получается, что дисковый ввод-вывод оказывает намного большее влияние на производительность, чем передача данных по сети. Этому есть несколько причин:

  • Spark использует дисковый ввод-вывод не только при считывании входного набора данных и записи результата, но и в ходе выполнения заданий для кэширования и переноса на диск данных, которые не умещаются в ОЗУ.

  • При выполнении аналитических заданий часто требуется производить агрегацию, поэтому объем данных, передаваемых по сети, обычно меньше, чем объем данных, которые первоначально считываются с диска.

Интересно, что специалисты Databricks примерно в 2016 году? пришли к таким же заключениям, что заставило их переориентировать вектор развития Spark на оптимизацию использования процессора. Результатом стало внедрение поддержки SQL, а также API DataFrames и позднее Datasets.

Насколько быстро работает Spark?

Давайте рассмотрим простую задачу — посчитаем наивным методом четные числа от 0 до 10?. Для выполнения такого задания Spark, в принципе, не требуется, поэтому для начала напишем простую программу на Scala:

var res: Long = 0L
var i: Long  = 0L
while (i < 1000L * 1000 * 1000) {
  if (i % 2 == 0) res += 1
  i += 1L
}

Листинг 1. Наивный подсчет

А теперь давайте также вычислим этот же результат с помощью Spark RDD и Spark Datasets. Чтобы эксперимент был честным, я запускаю Spark в локальном[1] режиме:

val res = spark.sparkContext
  .range(0L, 1000L * 1000 * 1000)
  .filter(_ % 2 == 0)
  .count()

Листинг 2. Подсчет с помощью RDD

val res = spark.range(1000L * 1000 * 1000)
  .filter(col("id") % 2 === 0)
  .select(count(col("id")))
  .first().getAs[Long](0)

Листинг 3. Подсчет с помощью Datasets

Время выполнения всех фрагментов кода приведено ниже. Неудивительно, что написанный вручную код является самым эффективным решением. Удивительно же то, что RDD в пять раз медленнее, тогда как у Datasets время вычисления почти такое же, как у написанного вручную кода.

 

Парадокс Datasets

Парадокс: API-интерфейс Datasets построен на основе RDD, однако работает намного быстрее, почти так же быстро, как код, написанный вручную для конкретной задачи. Как такое вообще возможно? Дело в новой модели выполнения.

Прошлое — модель Volcano

Код, написанный с использованием RDD, выполняется с помощью модели выполнения Volcano. На практике это означает, что каждый RDD следует стандартному интерфейсу:

  • знает свой родительский RDD;

  • предоставляет посредством метода compute доступ к итератору Iterator[T], который перебирает элементы данного RDD (он является private и должен использоваться только разработчиками Spark).

abstract class RDD[T: ClassTag]
def compute(…): Iterator[T]

Листинг 4. RDD.scala

С учетом этих свойств упрощенная версия реализации функции подсчета для RDD, которая игнорирует разбиение, выглядит вот так:

def pseudo_rdd_count(rdd: RDD[T]): Long = {
  val iter = rdd.compute
  var result = 0
  while (iter.hasNext) result += 1
  result
}

Листинг 5. Псевдокод для действия подсчета на основе RDD

Почему этот код работает значительно медленнее, чем написанный вручную код, который приведен в листинге 1? Есть несколько причин:

  • Вызовы итераторов виртуальной функцией: вызовы Iterator.next() несут дополнительную нагрузку по сравнению с функциями, не являющимися виртуальными, которые могут выполняться компилятором или JIT как встроенные (inline).

  • Отсутствие оптимизации на уровне ЦП: виртуальная Java-машина и JIT не могут оптимизировать байт-код, образуемый листингом 5, так же хорошо, как байт-код, получаемый при использовании листинга 1. В частности, написанный вручную код позволяет виртуальной Java-машине и JIT хранить промежуточные результаты вычислений в регистре ЦП, а не помещать их в основную память.

Настоящее — формирование кода всего этапа

Код, написанный с помощью Spark SQL?, выполняется не так, как код, написанный с использованием RDD. Когда запускается действие, Spark генерирует код, который сворачивает несколько трансформаций данных в одну функцию. Этот процесс называется формированием кода всего этапа (Whole-Stage Code Generation)?. Spark пытается воспроизвести процесс написания специального кода для конкретной задачи, в котором не используются вызовы виртуальных функций. Такой код может выполняться JVM/JIT более эффективно. На самом деле Spark генерирует довольно много кода, см., например, код Spark для листинга 3.

Технически Spark только формирует высокоуровневый код, а генерация байт-кода выполняется компилятором Janino?. Именно это и делает Spark SQL настолько быстрым по сравнению с RDD.

Эффективное использование Spark

Сегодня в Spark есть 3 API-интерфейса Scala/Java: RDD, Datasets и DataFrames (который теперь объединен с Datasets). RDD все еще широко применяется в Spark — в частности, из-за того, что этот API используется большинством созданных ранее заданий, и перспектива «продолжать в том же духе» весьма заманчива. Однако, как показывают тесты, переход на API-интерфейс Datasets может дать громадный прирост производительности за счет оптимизированного использования ЦП.

Неправильный подход — классический способ

Самая распространенная проблема, с которой я сталкивался при использовании Spark SQL, это явное переключение на API RDD. Причина состоит в том, что программисту зачастую проще сформулировать вычисление в терминах объектов Java, чем с помощью ограниченного языка Spark SQL:

val res = spark.range(1000L * 1000 * 1000)
    .rdd
    .filter(_ %2 == 0)
    .count()

Листинг 6. Переключение с Dataset на RDD

Этот код выполняется в течение 43 секунд вместо исходных 2,1 секунды, при этом делая абсолютно то же самое. Явное переключение на RDD останавливает формирование кода всего этапа и запускает преобразование элементов наборов данных из примитивных типов в объекты Java, что оказывается очень затратным. Если мы сравним схемы этапов выполнения кода из листингов 3 и 6 (см. ниже), то увидим, что во втором случае появляется дополнительный этап.

 

Рисунок 1. Визуальные представления этапов для листинга 3 (схема a) и листинга 6 (схема b)
Рисунок 1. Визуальные представления этапов для листинга 3 (схема a) и листинга 6 (схема b)

Неправильный подход — изысканный способ

Производительность Spark SQL является на удивление хрупкой. Это незначительное изменение приводит к увеличению времени выполнения запроса в три раза (до 6 секунд):

val res = spark
  .range(1000L * 1000 * 1000)
  .filter(x => x % 2 == 0) // note that the condition changed
  .select(count(col("id"))) 
  .first()
  .getAs[Long](0)

Листинг 7. Замена выражения Spark SQL функцией Scala

Spark не способен генерировать эффективный код для условия в фильтре. Условие является анонимной функцией Scala, а не выражением Spark SQL, и Spark выполнит десериализацию каждой записи из оптимизированного внутреннего представления, чтобы вызвать эту функцию. Причем вот что примечательно — это изменение никак не сказывается на визуальном представлении этапов (рис. 1a), поэтому его невозможно обнаружить, анализируя направленный ациклический граф (DAG) задания в пользовательском интерфейсе Spark.

Высокая производительность Spark SQL обеспечивается за счет ограничения круга доступных операций — чем-то все равно приходится жертвовать! Чтобы получить максимальную производительность, нужно использовать преобразования, которые работают со столбцами: используйте filter(condition: Column) вместо filter(T => Boolean) и select(…) вместо map(…). При этом Spark не придется перестраивать объект, представленный одной строкой набора данных (Dataset). И, разумеется, избегайте переключения на RDD.

Заключение и итоговые замечания

Приведенные в этой статье простые примеры демонстрируют, что большая часть времени выполнения заданий обработки больших данных не тратится на полезную работу. Хорошим решением этой проблемы является компиляция запросов, которая возможна с использованием Spark SQL и обеспечивает более эффективное использование современного аппаратного обеспечения. Последние исследования свидетельствуют, что использование эффективных запросов для стандартных процессов обработки больших данных важнее, чем оптимизация использования сети и дискового ввода-вывода.

Правильное применение компиляции запросов может сократить время обработки в 2–10 раз, а это означает ускорение экспериментов, снижение затрат на инфраструктуру и громадное удовольствие от элегантного выполнения своей работы!

Образцы кода из этой статьи можно найти здесь. С помощью этого репозитория можно анализировать производительность разных запросов Spark.

Использованные материалы

  1. Ousterhout, Kay, et al. Making sense of performance in data analytics frameworks (Анализ производительности платформ анализа данных). 12-й симпозиум {USENIX} по проектированию и реализации сетевых систем ({NSDI} 15). 2015.

  2. www.tpc.org/tpcds/

  3. databricks.com/blog/2015/04/28/project-tungsten-bringing-spark-closer-to-bare-metal.html

  4. janino-compiler.github.io/janino/

  5. people.csail.mit.edu/matei/papers/2015/sigmodsparksql.pdf

  6. databricks.com/blog/2016/05/23/apache-spark-as-a-compiler-joining-a-billion-rows-per-second-on-a-laptop.html


Узнать подробнее о курсах "Data Engineer" и "Экосистема Hadoop, Spark, Hive"