Часть II. Искажение данных и очистка памяти
Вторая часть нашей серии «Почему ваши приложения Spark медленно работают или выходят из строя» следует за первой частью об управлении памятью и посвящена вопросам, возникающим при искажении данных и очистки памяти в Spark. Как и многие другие проблемы, связанные с производительностью Spark, симптомы нарастают по мере увеличения объема данных, обрабатываемых приложением.
Что такое Искажение Данных (Data Skew)?
В идеальных условиях работы приложения Spark, когда необходимо выполнить соединение, например, коды доступа будут равномерно распределены, и каждый раздел будет хорошо подготовлен к обработке. Однако, реальные коммерческие данные редко бывают настолько понятными и совместимыми. Зачастую, мы получаем недостаточно оптимальную организацию данных для Spark, что приводит к снижению производительности из-за их искажения.
Искажение данных не является проблемой Spark как таковой, скорее это проблема работы с ними. Причиной проблемы искажения данных является их неравномерное распределение. Неравномерное распределение иногда неизбежно в общей компоновке данных или в характере запроса.
Для объединения и агрегирования Spark необходимо разместить записи одного кода в одном разделе. Записи кода всегда будут находиться в одном разделе. Аналогичным образом, другие записи кода будут распределены на другие разделы. Если один раздел станет очень большим, то это приведет к искажению данных, что будет затруднительно для любой системы запросов, если не будет выполнена соответствующая обработка.
Устранение искажения данных
Проблемы с искажением данных более очевидны в ситуациях, когда их необходимо перемешать в такой операции, как соединение или объединение. Перетасовка (Shuffle) - это операция, выполняемая Spark для хранения связанных данных (относящихся к одному коду) в одном разделе. Для этого Spark должен будет перемещать их по кластеру. По этой причине перетасовка (Shuffle) считается самой трудоемкой операцией.
Общими признаками искажения данных являются
Застопорившиеся этапы и задачи
Низкое использование процессора
Ошибки памяти
Есть несколько приемов, которые мы можем использовать для решения проблемы искажения данных в Spark.
Выявление и устранение искажения данных
Пользователи Spark часто замечают, что все задачи выполняются в течение разумного времени, только до того момента, пока одна из задач не будет длиться целую вечность. По всей вероятности, это указывает на то, что ваш информационный поток искажен. Такое положение вещей также приводит к общему недостаточному использованию кластера. Это становится особенно актуальной проблемой при запуске Spark в облаке, где избыточное предоставление ресурсов кластера является расточительным и затратным.
В таких случаях можно сделать несколько вещей, чтобы избежать искажения обработки данных. Если искажение находится на уровне источника данных (например, hive-таблица разбита на разделы с помощью команды on _month key
(текущий месяц), а в таблице гораздо больше записей для particular _month
(определенный месяц)), тогда это приведет к искажению обработки на этапе чтения из таблицы. В таком случае помогает преобразование таблицы с помощью других(ой) команд(ы) разбиения.
Однако, иногда это нецелесообразно, так как таблица может быть использована другими каналами передачи данных компании.
Передача данных
Если мы получаем искаженный набор данных, то одним из решений является увеличение значения "spark.sql.autoBroadcastJoinThreshold
" для того, чтобы передавались уменьшенные таблицы. Это необходимо сделать для обеспечения достаточного количества памяти драйвера и исполнителя.
Предварительная обработка данных
Если в программе соединения или командной строке слишком много нулевых значений, то выполняемая операция будет искажена. Попробуйте преобразовать нулевые значения с помощью некоторых произвольных идентификаторов (random ids) и обработать их в приложении.
Хэширование с солью (Salting)
В операции присоединения SQL-таблицы (Structured Query Language) код привязки используется для равномерного перераспределения данных таким образом, чтобы обработка раздела не занимала больше времени. Этот метод называется солью (Salting
). Рассмотрим пример для проверки результата соления. В операции присоединения или группировки, Spark сопоставляет ключ (key
) с идентификатором (id
) определённого раздела, вычисляя значение хэш-кода и деля его на количество тасованных (Shuffle
) разделов.
Предположим, что есть две таблицы со следующей схемой.
Рассмотрим случай, когда конкретный параметр сильно искажен, например, код 1 (key 1), и мы хотим объединить обе таблицы и сгруппировать их, чтобы добиться результата. Например,
После этапа перетасовки (shuffle
), в связи с выполнением операции соединения, все строки, имеющие одинаковый ключ (key
), должны находиться в одном и том же разделе. Посмотрите на диаграмму выше. Здесь все строки клавиши 1 находятся в разделе 1. Аналогично все строки с клавишей 2 находятся в разделе 2. Вполне естественно, что обработка раздела 1 займет больше времени, так как раздел содержит больше данных. Проверим пользовательский интерфейс Spark на время выполнения этапа перетасовки (shuffle
) для вышеприведенного запроса.
Как видим, одна задача заняла намного больше времени, чем другие. При большем количестве данных она была бы еще более объемной. Также это может привести к нестабильной работе приложения с точки зрения использования памяти, т.к. один раздел будет сильно загружен.
Можем ли мы что-нибудь добавить к данным, чтобы наш информационный массив был более равномерно распределен? Большинство пользователей при проблемах с искажениями используют технику засолки (salting). Соление (Salting) — это техника, при которой мы добавляем случайные значения (в данном случае это <salt>), чтобы связать их с кодом (key) одной из таблиц. В другой таблице нам нужно скопировать строки, чтобы они соответствовали случайным ключам (keys). Идея в том, что если условие соединения удовлетворяется условием ключ1 == ключ1
(key1 == key1
), то оно также должно удовлетворяться ключ1<соль> = ключ1<соль>
(key1<salt> = key1<salt>
). Значение соли поможет распределить набор данных более равномерно.
Вот пример того, как это сделать применительно к нашему случаю. Проверьте число 20, используемое при выполнении случайной функции и при извлечении из архива данных. Это определённое число разделов, которое мы используем для искаженного ключа (key). Это очень простой пример, который можно исправить, включив в него только искаженные элементы (keys).
Теперь давайте еще раз проверим работу приложения Spark. Как мы видим, время обработки стало более равномерным.
Обратите внимание, что для небольших объемов данных разница в производительности не будет сильно отличаться. Иногда способ компрессии (compress) в распределении данных (shuffle) также играет роль в общем времени обработки. Для искаженной информации, перетасованные данные (shuffled data) могут быть сильно сжаты в связи с повторяющейся структурой. Следовательно, общий объем ввода-вывода с диска/ сетевой передачи также уменьшается. Мы можем запустить наше приложение без соли (salt) и с солью, чтобы найти наиболее подходящий вариант.
Сборка мусора (Garbage Collection)
Spark работает на виртуальной машине Java (JVM). Поскольку Spark может хранить большие объемы данных в памяти, она в значительной степени полагается на управление памятью с помощью Java и сборщика мусора (GC). Поэтому решение проблемы по сборке мусора (GC) может стать основной задачей, которая может отразиться на многих приложениях Spark.
Распространенные признаки избытка мусора (GC) в приложении Spark:
Медленная работа приложения
Таймаут сердцебиения исполнителя (Executor heartbeat timeout)
Превышение предела допустимой перегрузки GC
Ориентированный на память подход приложений Spark, интенсивно использующих данные, делает эту проблему (GC) наиболее распространенной по сравнению с другими Java-приложениями. К счастью, легко диагностировать, если ваше приложение Spark испытывает проблемы с GC. Интерфейс (UI) Spark обозначает исполнителей (executors) красным цветом, если они затратили слишком много времени на GC.
Исполнители (Executors) Spark выполняют значительное количество процессорных циклов (CPU cycles), осуществляя уборку мусора. Это можно выяснить, взглянув на вкладку "Исполнители (Executors)" в пользовательском интерфейсе (UI) приложения Spark. Spark отметит исполнителя красным цветом, если он потратил на сбор мусора более 10% времени, чем на выполнение задачи, как показано на диаграмме ниже.
Решение проблем по сбору мусора
Вот некоторые из основных мер, которые мы можем предпринять, чтобы попытаться решить проблемы, связанные со сбором мусора (GC).
Структуры данных
При использовании приложений, основанных на RDD (Resilient Distributed Dataset), используйте структуры данных с меньшим количеством объектов. Например, используйте массив вместо списка.
Специализированные структуры данных
Если вы имеете дело с простейшими типами данных, рассмотрите возможность использования специализированных структур данных, таких как Koloboke или Fastutil. Эти структуры оптимизируют использование памяти для данных типов.
Хранение данных вне кучи* (off-heap)
Управляющая программа и память Spark могут хранить информацию вне кучи (off-heap). Вы можете включить off-heap накопитель, используя команды
-conf spark.memory.offHeap.enabled = true
-conf spark.memory.offHeap.size = Xgb.
Будьте осторожны при использовании хранилища вне кучи (off-heap), т.к. это не повлияет на размер памяти самой кучи (on-heap), т.е. не уменьшает ее объем. Поэтому, чтобы определить общий лимит памяти, задайте меньший размер кучи.
*Куча (heap) — название структуры данных, с помощью которой реализована динамически распределяемая память приложения.
Встроенные функции по сравнению с функциями, заданными пользователем (UDFS)
Если вы используете Spark SQL, постарайтесь максимально задействовать встроенные функции, насколько это возможно, а не писать новые UDF. Большинство UDF-файлов SPARK могут работать на UnsafeRow
(unsafe row — небезопасная строка) и не нуждаются в дополнительной адаптации. Это позволяет избежать возникновения мусора, а также хорошо взаимодействует с процессом генерирования кода.
Экономия на создании объектов
Помните, что мы можем работать с миллиардами строк. Если мы создадим даже небольшой временный объект размером 100 байт для каждой строки, то это создаст 1 миллиард * 100 байт мусора.
Конец второй части
До сих пор мы акцентировали внимание на управлении памятью, искажении данных и сборе мусора, как причинах замедления и сбоев в ваших приложениях Spark. В третьей части цикла мы обратим внимание на управление ресурсами и настройки кластера, поскольку такие проблемы, как локализация данных, рабочие нагрузки, связанные с вводом-выводом, разбиение на разделы и параллелизм могут вызывать настоящую головную боль, если у вас нет достаточной информации и знаний о том, как выполняется работа с данными.
Если вы нашли этот блог полезным, вы, возможно, захотите посмотреть первую часть этого цикла статей Why Your Spark Apps is Slow or Failing: Часть I Управление памятью.
Перевод статьи подготовлен в преддверии старта курса «Экосистема Hadoop, Spark, Hive»
Также предлагаем будущим студентам курса и всем желающим посмотреть запись открытого вебинара на тему «Spark Streaming».
sshikov
Skew это не искажение ни разу. Это перекос — когда с одним ключом в таблице намного больше значений, чем с другими. Ну или неравномерность распределения.