Будущих учащихся на курсе «Экосистема Hadoop, Spark, Hive» приглашаем на открытый вебинар по теме «Spark Streaming». На вебинаре участники вместе с экспертом познакомятся со Spark Streaming и Structured Streaming, изучат их особенности и напишут простое приложение обработки потоков.
А сейчас делимся с вами традиционным переводом полезного материала.
Spark приложения легко писать и легко понять, когда все идет по плану. Однако, это становится очень сложно, когда приложения Spark начинают медленно запускаться или выходить из строя. Порой хорошо настроенное приложение может выйти из строя из-за изменения данных или изменения компоновки данных. Иногда приложение, которое до сих пор работало хорошо, начинает вести себя плохо из-за нехватки ресурсов. Список можно продолжать и продолжать.
Важно понимать не только приложение Spark, но также и его базовые компоненты среды выполнения, такие как использование диска, сети, конфликт доступа и т.д., чтобы мы могли принимать обоснованные решения, когда дела идут плохо.
В этой серии статей я хочу рассказать о некоторых наиболее распространенных причинах, по которым приложение Spark выходит из строя или замедляется. Первая и наиболее распространенная — это управление памятью.
Если бы мы заставили всех разработчиков Spark проголосовать, то условия отсутствия памяти (OOM) наверняка стали бы проблемой номер один, с которой все столкнулись. Это неудивительно, так как архитектура Spark ориентирована на память. Некоторые из наиболее распространенных причин OOM:
неправильное использование Spark
высокая степень многопоточности (high concurrency)
неэффективные запросы
неправильная конфигурация
Чтобы избежать этих проблем, нам необходимо базовое понимание Spark и наших данных. Есть определенные вещи, которые могут быть сделаны, чтобы либо предотвратить OOM, либо настроить приложение, которое вышло из строя из-за OOM. Стандартная конфигурация Spark может быть достаточной или не подходящей для ваших приложений. Иногда даже хорошо настроенное приложение может выйти из строя по причине OOM, когда происходят изменения базовых данных.
Переполнение памяти может быть в узлах драйвера, исполнителя и управления. Рассмотрим каждый случай.
НЕДОСТАТОЧНО ПАМЯТИ ПРИ РАБОТЕ ДРАЙВЕРА
Драйвер в Spark — это JVM (Java Virtual Machine) процесс, в котором работает основной поток управления приложения. Чаще всего драйвер выходит из строя с ошибкой OutOfMemory
— OOM (недостаточно памяти из-за неправильного использования Spark. Spark — это механизм распределения нагрузки между рабочим оборудованием. Драйвер должен рассматриваться только как дирижер. В типовых установках драйверу предоставляется меньше памяти, чем исполнителям. Поэтому мы должны быть осторожны с тем, что мы делаем с драйвером.
Обычными причинами, приводящими к OutOfMemory
OOM (недостаточно памяти) драйвера, являются:
rdd.collect()
sparkContext.broadcast
Низкий уровень памяти драйвера, настроенный в соответствии с требованиями приложения
Неправильная настройка
Spark.sql.autoBroadcastJoinThreshold
.
Spark использует этот лимит для распределения связей ко всем узлам в случае операции соединения. При самом первом использовании, все связи реализуются на узле драйвера. Иногда многочисленные таблицы также транслируются как часть осуществления запроса.
Попробуйте написать свое приложение таким образом, чтобы в драйвере можно было избежать полного сбора всех результатов. Вы вполне можете делегировать эту задачу одной из управляющих программ. Например, если вы хотите сохранить результаты в определенном файле, вы можете либо собрать их в драйвере, или назначить программу, которая сделает это за вас.
Если вы используете SQL (Structured Query Language) от Spark, а драйвер находится в состоянии OOM из-за распределения связей, то вы можете либо увеличить память драйвера, если это возможно; либо уменьшить значение "spark.sql.autoBroadcastJoinThreshold
" (неправильная настройка порога подключения) так, чтобы ваши операции по объединению использовали более удобные для памяти операции слияния соединений.
Недостаточно памяти при работе управляющей программы
Это очень распространенная проблема с приложениями Spark, которая может быть вызвана различными причинами. Некоторые из наиболее распространенных причин — высокая степень многопоточности, неэффективные запросы и неправильная конфигурация. Рассмотрим каждую по очереди.
Высокая степень многопоточности
Прежде чем понять, почему высокая степень многопоточности может быть причиной OOM, давайте попробуем понять, как Spark выполняет запрос или задание и какие компоненты способствуют потреблению памяти.
Spark задания или запросы разбиваются на несколько этапов, и каждый этап далее делится на задачи. Количество задач зависит от различных факторов, например, на какой стадии выполняется, какой источник данных читается и т.д. Если это этап map-stage (фаза сканирования в SQL), то, как правило, соблюдаются базовые разделы источника данных.
Например, если реестр таблицы ORC (Optimized Row Columnar) имеет 2000 разделов, то для этапа map-stage создается 2000 заданий для чтения таблицы, предполагая, что обработка разделов ещё не началась. Если это этап reduce-stage (стадия Shuffle), то для определения количества задач Spark будет использовать либо настройку "spark.default.parallelism
" для RDD (Resilient Distributed Dataset), либо "spark.sql.shuffle.partitions
" для DataSet (набор данных). Сколько задач будет выполняться параллельно каждой управляющей программе, будет зависеть от свойства "spark.executor.cores
". Если это значение установить больше без учета памяти, то программы могут отказать и привести к ситуации OOM (недостаточно памяти). Теперь посмотрим на то, что происходит, как говорится, за кадром, при выполнении задачи и на некоторые вероятные причины OOM.
Допустим, мы реализуем задачу создания схемы (map) или этап сканирования SQL из файла HDFS (распределенная файловая система Hadoop distributed file system) или таблицы Parquet/ORC. Для файлов HDFS каждая задача Spark будет считывать блок данных размером 128 МБ. Таким образом, если выполняется 10 параллельных задач, то потребность в памяти составляет не менее 128*10 только для хранения разбитых на разделы данных. При этом опять же игнорируется любое сжатие данных, которое может привести к резкому скачку данных в зависимости от алгоритмов сжатия.
Spark читает Parquet (формат файлов с открытым исходным кодом) в векторном формате. Проще говоря, каждая задача Spark считывает данные из файла Parquet пакет за пакетом. Так как Parquet является столбцом, то эти пакеты строятся для каждого из столбцов. Она накапливает определенный объем данных по столбцам в памяти перед выполнением любой операции над этим столбцом. Это означает, что для хранения такого количества данных Spark необходимы некоторые структуры данных и учет. Кроме того, такие методы кодирования, как словарное кодирование, имеют некоторое состояние, сохраненное в памяти. Все они требуют памяти.
Так что, при большем количестве параллелей, потребление ресурсов увеличивается. Кроме того, если речь идет о широковещательное соединении (broadcast join), то широковещательные переменные (broadcast variables) также займут некоторое количество памяти. На приведенной выше диаграмме показан простой случай, когда каждый исполнитель выполняет две задачи параллельно.
Неэффективные запросы
Хотя программа Spark's Catalyst пытается максимально оптимизировать запрос, она не может помочь, если сам запрос плохо написан. Например, выбор всех столбцов таблицы Parquet/ORC. Как видно из предыдущего раздела, каждый столбец нуждается в некотором пакетном состоянии в памяти. Если выбрано больше столбцов, то больше будет потребляться ресурсов.
Постарайтесь считывать как можно меньше столбцов. Попробуйте использовать фильтры везде, где это возможно, чтобы меньше данных попадало к управляющим программам. Некоторые источники данных поддерживают обрезку разделов. Если ваш запрос может быть преобразован в столбец(ы) раздела, то это в значительной степени уменьшит перемещение данных.
Неправильная конфигурация
Неправильная конфигурация памяти и кэширования также может привести к сбоям и замедлению работы приложений Spark. Рассмотрим некоторые примеры.
ПАМЯТЬ ИСПОЛНИТЕЛЯ И ДРАЙВЕРА
Требования к памяти каждого приложения разные. В зависимости от требований, каждое приложение должно быть настроено по-разному. Вы должны обеспечить правильные значения памяти spark.executor.memory
или spark.driver.memory
в зависимости от загруженности. Как бы очевидно это ни казалось, это одна из самых трудных задач. Нам нужна помощь средств для мониторинга фактического использования памяти приложения. Unravel (Unravel Data Operations Platform) делает это довольно хорошо.
ПЕРЕГРУЗКА ПАМЯТИ
Иногда это не память управляющей программы, а перегруженная память модуля YARN (Yet Another Resource Negotiator — еще один ресурсный посредник), которая вызывает OOM или узел перестает функционировать (killed) из-за YARN. Сообщения "YARN kill" обычно выглядят так:
YARN запускает каждый компонент Spark, как управляющие программы и драйвера внутри модулей. Переполненная память — это off-heap память, используемая для JVM в режиме перегрузки, интернированных строк и других метаданных JVM. В этом случае необходимо настроить spark.yarn.executor.memoryOverhead
на нужное значение. Обычно 10% общей памяти управляющей программы должно быть выделено под неизбежное потребление ресурсов.
КЭШИРОВАННАЯ ПАМЯТЬ
Если ваше приложение использует кэширование Spark для хранения некоторых наборов данных, то стоит обратить внимание на настройки менеджера памяти Spark. Менеджер памяти Spark разработан в очень общем стиле, чтобы удовлетворить основные рабочие нагрузки. Следовательно, есть несколько настроек, чтобы установить его правильно для определенной внеплановой нагрузки.
Spark определила требования к памяти как два типа: исполнение и хранение. Память хранения используется для кэширования, а память исполнения выделяется для временных структур, таких как хэш-таблицы для агрегирования, объединения и т. д.
Как память исполнения, так и память хранения можно получить из настраиваемой части (общий объем памяти — 300МБ). Эта настройка называется "spark.memory.fraction
". По умолчанию — 60%. Из них по умолчанию 50% (настраивается параметром "spark.memory.storageFraction
") выделяется на хранение и остаток выделяется на исполнение.
Бывают ситуации, когда каждый из вышеперечисленных резервов памяти, а именно исполнение и хранение, могут занимать друг у друга, если другой свободен. Кроме того, память в хранилище может быть уменьшена до предела, если она заимствовала память из исполнения. Однако, не вдаваясь в эти сложности, мы можем настроить нашу программу таким образом, чтобы наши кэшированные данные, которые помещаются в память хранилища, не создавали проблем для выполнения.
Если мы не хотим, чтобы все наши кэшированные данные оставались в памяти, то мы можем настроить "spark.memory.storageFraction
" на меньшее значение, чтобы лишние данные были исключены и выполнение не столкнулось бы с нехваткой памяти.
Перегрузка памяти в менеджере узла
Spark приложения, которые осуществляют перетасовку данных в рамках групповых операций или присоединяются к подобным операциям, испытывают значительные перегрузки. Обычно процесс перетасовки выполняется управляющей программой. Если управляющая программа (исполнитель) занята или завалена большим количеством (мусора) GC (Garbage Collector), то она не может обслуживать перетасовки запросов. Эта проблема в некоторой степени решается за счет использования внешнего сервиса обмена.
Внешний сервис обмена работает на каждом рабочем узле и обрабатывает поступающие от исполнителей запросы на переключение. Исполнители могут читать перемешанные файлы с этого сервиса, вместо того, чтобы не считывать файлы между собой. Это помогает запрашивающим исполнителям читать перемешанные файлы, даже если производящие их исполнители не работают или работают медленно. Также, когда включено динамическое распределение, его обязательным условием является включение внешнего сортировочного сервиса.
Когда внешний сервис обмена данными Spark настроен с помощью YARN, NodeManager (управляющий узел) запускает вспомогательный сервис, который действует как внешний провайдер обмена данными. По умолчанию память NodeManager составляет около 1 ГБ. Однако приложения, выполняющие значительную перестановку данных, могут выйти из строя из-за того, что память NodeManager исчерпана. Крайне важно правильно настроить NodeManager, если ваши приложения попадают в вышеуказанную категорию.
Конец части №1, спасибо за внимание
Процессинг внутренней памяти Spark — ключевая часть ее мощности. Поэтому эффективное управление памятью является критически важным фактором для получения наилучшей производительности, расширяемости и стабильности ваших приложений Spark и каналов передачи данных. Однако настройки по умолчанию в Spark часто бывают недостаточными. В зависимости от приложения и среды, некоторые ключевые параметры конфигурации должны быть установлены правильно для достижения ваших целей производительности. Если иметь базовое представление о них и о том, как они могут повлиять на общее приложение, то это поможет в работе.
Я поделился некоторыми соображениями о том, на что следует обратить внимание при рассмотрении вопроса об управлении памятью Spark. Это область, которую платформа Unravel понимает и оптимизирует очень хорошо, с небольшим количеством, если таковое вообще потребуется, человеческого вмешательства. Я рекомендую вам заказать демо-версию, чтобы увидеть Unravel в действии. Мы видим довольно значительное ускорение работы приложений Spark.
Во второй части этой серии статьи напишу о том, почему ваши приложения Spark медленно работают или не работают: Во второй части цикла, посвященной искажению данных и сбору мусора, я расскажу о том, как структура данных, искажение данных и сбор мусора влияют на производительность Spark.
Узнать подробнее о курсе «Экосистема Hadoop, Spark, Hive».
Записаться на открытый вебинар по теме «Spark Streaming».