Хабр, привет! Вчера на митапе, посвященном Apache Spark, от ребят из Rambler&Co, было довольно много вопросов от участников, связанных с конфигурированием этого инструмента. Решили по его следам поделиться своим опытом. Тема непростая — поэтому предлагаем делиться опытом тоже в комментариях, может быть, мы тоже что-то не так понимаем и используем.
Небольшая вводная — как мы используем Spark. У нас есть трёхмесячная программа “Специалист по большим данным”, и весь второй модуль наши участники работают на этом инструменте. Соответственно, наша задача, как организаторов, подготовить кластер под использование в рамках такого кейса.
Особенность нашего использования заключается в том, что количество человек, одновременно работающих на Spark, может быть равно всей группе. Например, на семинаре, когда все одновременно что-то пробуют и повторяют за нашим преподавателем. А это немного-немало — под 40 человек порой. Наверное, не так много компаний в мире, которые сталкиваются с таким сценарием использования.
Далее я расскажу, как и почему мы подбирали те или иные параметры конфига.
Начнём с самого начала. У Spark есть 3 варианта работать на кластере: standalone, с использованием Mesos и с использованием YARN. Мы решили выбрать третий вариант, потому что для нас он был логичен. У нас уже есть hadoop-кластер. Наши участники хорошо уже знакомы с его архитектурой. Давайте юзать YARN.
Далее интереснее. У каждого из этих 3 вариантов развертывания есть 2 варианта деплоя: client и cluster. Исходя из документации и разных ссылок в интернете, можно сделать вывод, что client подходит для интерактивной работы — например, через jupyter notebook, а cluster больше подходит для production-решений. В нашем случае нас интересовала интерактивная работа, поэтому:
В общем-то с этого момента Spark уже будет как-то работать на YARN, но нам этого не было достаточно. Поскольку у нас программа про большие данные, то порой участникам не хватало того, что получалось в рамках равномерной нарезки ресурсов. И тут мы нашли интересную вещь — динамическую аллокацию ресурсов. Если коротко, то суть в следующем: если у вас тяжелая задача и кластер свободен (например, с утра), то при помощи этой опции Spark вам может выдать дополнительные ресурсы. Необходимость считается там по хитрой формуле. Вдаваться в подробности не будем — она неплохо работает.
Мы поставили этот параметр, и при запуске Spark ругнулся и не запустился. Правильно, потому что надо было читать документацию внимательнее. Там указано, что для того, чтобы все было ок, нужно еще включить дополнительный параметр.
Зачем он нужен? Когда наш джоб больше не требует такого количества ресурсов, то Spark должен вернуть их в общий пул. Самая трудозатратная стадия почти в любой MapReduce задаче — это стадия Shuffle. Этот параметр позволяет сохранять данные, которые образуются на этой стадии и соответственно освобождать executors. А executor — это процесс, который на воркере всё обсчитывает. У него есть какое-то количество процессорных ядер и какое-то количество памяти.
Добавили этот параметр. Всё вроде бы заработало. Стало заметно, что участникам реально стало выдаваться больше ресурсов, когда им было нужно. Но возникла другая проблема — в какой-то момент другие участники просыпались и тоже хотели использовать Spark, а там всё занято, и они были недовольны. Их можно понять. Стали смотреть в документацию. Там оказалось, что есть еще какое-то количество параметров, при помощи которых можно повлиять на процесс. Например, если executor находится в режиме ожидания — через какое время у него можно забрать ресурсы?
В нашем случае — если ваши executors ничего не делают в течение двух минут, то, будьте добры, верните их в общий пул. Но и этого параметра не всегда хватало. Было видно, что человек уже давно ничего не делает, а ресурсы не освобождаются. Оказалось, что есть еще специальный параметр — по прошествии какого времени отбирать executors, которые содержат закэшированные данные. По дефолту этот параметр стоял — infinity! Мы его поправили.
То есть если в течение 5 минут ваши executors ничего не делают, отдайте-ка их в общий пул. В таком режиме скорость освобождения и выдачи ресурсов для большого количества пользователей стала достойной. Количество недовольства сократилось. Но мы решили пойти дальше и ограничить максимальное количество executors на один application — по сути на одного участника программы.
Теперь, конечно, появились недовольные с другой стороны — “кластер простаивает, а у меня всего лишь 19 executors”, но что поделать — нужен какой-то правильный баланс. Всех сделать счастливыми не получится.
И еще одна небольшая история, связанная со спецификой нашего кейса. Как-то на практическое занятие опоздали несколько человек, и у них Spark почему-то не стартовал. Мы посмотрели на количество свободных ресурсов — вроде бы есть. Spark должен стартовать. Благо, что к тому моменту документация уже где-то записалась на подкорку, и мы вспомнили, что при запуске Spark ищет себе порт, на котором стартовать. Если первый порт из диапазона занят, то он переходит к следующему по порядку. Если он свободен, то захватывает. И есть параметр, который указывает на максимальное количество попыток для этого. По умолчанию — это 16. Число меньше, чем людей в нашей группе на занятии. Соответственно, после 16 попыток Spark бросал это дело и говорил, что не могу стартануть. Мы поправили этот параметр.
Дальше расскажу о некоторых настройках, уже не сильно связанных со спецификой нашего кейса.
Для более быстрого старта Spark есть рекомендация папку jars, лежащую в домашней директории SPARK_HOME, заархивировать и положить на HDFS. Тогда он не будет тратить времени на загрузку этих джарников по воркерам.
Также для более быстрой работы рекомендуется в качестве сериалайзера использовать kryo. Он более оптимизированный, чем тот, что по умолчанию.
И есть еще давняя проблема Spark, что он часто валится по памяти. Часто это происходит в тот момент, когда воркеры всё посчитали и отправляют результат на драйвер. Мы сделали себе этот параметр побольше. По умолчанию, он 1Гб, мы сделали — 3.
И последнее, в качестве десерта. Как обновить Spark до версии 2.1 на HortonWorks дистрибутиве — HDP 2.5.3.0. Эта версия HDP содержит в себе предустановленную версию 2.0, но мы как-то однажды для себя решили, что Spark довольно активно развивается, и каждая новая версия фиксит какие-то баги плюс дает дополнительные возможности, в том числе и для python API, поэтому решили, что нужно делать апдейт.
Скачали версию с официального сайта под Hadoop 2.7. Разархивировали, закинули в папку с HDP. Поставили симлинки как надо. Запускаем — не стартует. Пишет очень непонятную ошибку.
Погуглив, выяснили, что Spark решил не ждать пока Hadoop разродится, и решили использовать новую версию jersey. Они сами там друг с другом ругаются на эту тему в JIRA. Решением было — скачать jersey версии 1.17.1. Закинуть это в папку jars в SPARK_HOME, снова сделать zip и закинуть на HDFS.
Эту ошибку мы обошли, но возникла новая и довольно-таки обтекаемая.
При этом пробуем запускать версию 2.0 — всё ок. Попробуй догадайся, в чем дело. Мы залезли в логи этого application и увидели что-то такое:
В общем, по каким-то причинам hdp.version не резолвилась. Погуглив, нашли решение. Нужно в Ambari зайти в настройки YARN и добавить там параметр в custom yarn-site:
Эта магия помогла, и Spark взлетел. Протестили несколько наших jupyter-ноутбуков. Всё работает. К первому занятию по Spark в субботу (уже завтра) мы готовы!
UPD. На занятии выяснилась еще одна проблема. В какой-то момент YARN перестал выдавать контейнеры для Spark. В YARN нужно было поправить параметр, который по дефолту стоял 0.2:
То есть только 20% ресурсов участвовали в раздаче ресурсов. Поменяв параметры, перезагрузили YARN. Проблема была решена, и остальные участники тоже смогли запустить spark context.
Небольшая вводная — как мы используем Spark. У нас есть трёхмесячная программа “Специалист по большим данным”, и весь второй модуль наши участники работают на этом инструменте. Соответственно, наша задача, как организаторов, подготовить кластер под использование в рамках такого кейса.
Особенность нашего использования заключается в том, что количество человек, одновременно работающих на Spark, может быть равно всей группе. Например, на семинаре, когда все одновременно что-то пробуют и повторяют за нашим преподавателем. А это немного-немало — под 40 человек порой. Наверное, не так много компаний в мире, которые сталкиваются с таким сценарием использования.
Далее я расскажу, как и почему мы подбирали те или иные параметры конфига.
Начнём с самого начала. У Spark есть 3 варианта работать на кластере: standalone, с использованием Mesos и с использованием YARN. Мы решили выбрать третий вариант, потому что для нас он был логичен. У нас уже есть hadoop-кластер. Наши участники хорошо уже знакомы с его архитектурой. Давайте юзать YARN.
spark.master=yarn
Далее интереснее. У каждого из этих 3 вариантов развертывания есть 2 варианта деплоя: client и cluster. Исходя из документации и разных ссылок в интернете, можно сделать вывод, что client подходит для интерактивной работы — например, через jupyter notebook, а cluster больше подходит для production-решений. В нашем случае нас интересовала интерактивная работа, поэтому:
spark.deploy-mode=client
В общем-то с этого момента Spark уже будет как-то работать на YARN, но нам этого не было достаточно. Поскольку у нас программа про большие данные, то порой участникам не хватало того, что получалось в рамках равномерной нарезки ресурсов. И тут мы нашли интересную вещь — динамическую аллокацию ресурсов. Если коротко, то суть в следующем: если у вас тяжелая задача и кластер свободен (например, с утра), то при помощи этой опции Spark вам может выдать дополнительные ресурсы. Необходимость считается там по хитрой формуле. Вдаваться в подробности не будем — она неплохо работает.
spark.dynamicAllocation.enabled=true
Мы поставили этот параметр, и при запуске Spark ругнулся и не запустился. Правильно, потому что надо было читать документацию внимательнее. Там указано, что для того, чтобы все было ок, нужно еще включить дополнительный параметр.
spark.shuffle.service.enabled=true
Зачем он нужен? Когда наш джоб больше не требует такого количества ресурсов, то Spark должен вернуть их в общий пул. Самая трудозатратная стадия почти в любой MapReduce задаче — это стадия Shuffle. Этот параметр позволяет сохранять данные, которые образуются на этой стадии и соответственно освобождать executors. А executor — это процесс, который на воркере всё обсчитывает. У него есть какое-то количество процессорных ядер и какое-то количество памяти.
Добавили этот параметр. Всё вроде бы заработало. Стало заметно, что участникам реально стало выдаваться больше ресурсов, когда им было нужно. Но возникла другая проблема — в какой-то момент другие участники просыпались и тоже хотели использовать Spark, а там всё занято, и они были недовольны. Их можно понять. Стали смотреть в документацию. Там оказалось, что есть еще какое-то количество параметров, при помощи которых можно повлиять на процесс. Например, если executor находится в режиме ожидания — через какое время у него можно забрать ресурсы?
spark.dynamicAllocation.executorIdleTimeout=120s
В нашем случае — если ваши executors ничего не делают в течение двух минут, то, будьте добры, верните их в общий пул. Но и этого параметра не всегда хватало. Было видно, что человек уже давно ничего не делает, а ресурсы не освобождаются. Оказалось, что есть еще специальный параметр — по прошествии какого времени отбирать executors, которые содержат закэшированные данные. По дефолту этот параметр стоял — infinity! Мы его поправили.
spark.dynamicAllocation.cachedExecutorIdleTimeout=600s
То есть если в течение 5 минут ваши executors ничего не делают, отдайте-ка их в общий пул. В таком режиме скорость освобождения и выдачи ресурсов для большого количества пользователей стала достойной. Количество недовольства сократилось. Но мы решили пойти дальше и ограничить максимальное количество executors на один application — по сути на одного участника программы.
spark.dynamicAllocation.maxExecutors=19
Теперь, конечно, появились недовольные с другой стороны — “кластер простаивает, а у меня всего лишь 19 executors”, но что поделать — нужен какой-то правильный баланс. Всех сделать счастливыми не получится.
И еще одна небольшая история, связанная со спецификой нашего кейса. Как-то на практическое занятие опоздали несколько человек, и у них Spark почему-то не стартовал. Мы посмотрели на количество свободных ресурсов — вроде бы есть. Spark должен стартовать. Благо, что к тому моменту документация уже где-то записалась на подкорку, и мы вспомнили, что при запуске Spark ищет себе порт, на котором стартовать. Если первый порт из диапазона занят, то он переходит к следующему по порядку. Если он свободен, то захватывает. И есть параметр, который указывает на максимальное количество попыток для этого. По умолчанию — это 16. Число меньше, чем людей в нашей группе на занятии. Соответственно, после 16 попыток Spark бросал это дело и говорил, что не могу стартануть. Мы поправили этот параметр.
spark.port.maxRetries=50
Дальше расскажу о некоторых настройках, уже не сильно связанных со спецификой нашего кейса.
Для более быстрого старта Spark есть рекомендация папку jars, лежащую в домашней директории SPARK_HOME, заархивировать и положить на HDFS. Тогда он не будет тратить времени на загрузку этих джарников по воркерам.
spark.yarn.archive=hdfs:///tmp/spark-archive.zip
Также для более быстрой работы рекомендуется в качестве сериалайзера использовать kryo. Он более оптимизированный, чем тот, что по умолчанию.
spark.serializer=org.apache.spark.serializer.KryoSerializer
И есть еще давняя проблема Spark, что он часто валится по памяти. Часто это происходит в тот момент, когда воркеры всё посчитали и отправляют результат на драйвер. Мы сделали себе этот параметр побольше. По умолчанию, он 1Гб, мы сделали — 3.
spark.driver.maxResultSize=3072
И последнее, в качестве десерта. Как обновить Spark до версии 2.1 на HortonWorks дистрибутиве — HDP 2.5.3.0. Эта версия HDP содержит в себе предустановленную версию 2.0, но мы как-то однажды для себя решили, что Spark довольно активно развивается, и каждая новая версия фиксит какие-то баги плюс дает дополнительные возможности, в том числе и для python API, поэтому решили, что нужно делать апдейт.
Скачали версию с официального сайта под Hadoop 2.7. Разархивировали, закинули в папку с HDP. Поставили симлинки как надо. Запускаем — не стартует. Пишет очень непонятную ошибку.
java.lang.NoClassDefFoundError: com/sun/jersey/api/client/config/ClientConfig
Погуглив, выяснили, что Spark решил не ждать пока Hadoop разродится, и решили использовать новую версию jersey. Они сами там друг с другом ругаются на эту тему в JIRA. Решением было — скачать jersey версии 1.17.1. Закинуть это в папку jars в SPARK_HOME, снова сделать zip и закинуть на HDFS.
Эту ошибку мы обошли, но возникла новая и довольно-таки обтекаемая.
org.apache.spark.SparkException: Yarn application has already ended! It might have been killed or unable to launch application master
При этом пробуем запускать версию 2.0 — всё ок. Попробуй догадайся, в чем дело. Мы залезли в логи этого application и увидели что-то такое:
/usr/hdp/${hdp.version}/hadoop/lib/hadoop-lzo-0.6.0.${hdp.version}.jar
В общем, по каким-то причинам hdp.version не резолвилась. Погуглив, нашли решение. Нужно в Ambari зайти в настройки YARN и добавить там параметр в custom yarn-site:
hdp.version=2.5.3.0-37
Эта магия помогла, и Spark взлетел. Протестили несколько наших jupyter-ноутбуков. Всё работает. К первому занятию по Spark в субботу (уже завтра) мы готовы!
UPD. На занятии выяснилась еще одна проблема. В какой-то момент YARN перестал выдавать контейнеры для Spark. В YARN нужно было поправить параметр, который по дефолту стоял 0.2:
yarn.scheduler.capacity.maximum-am-resource-percent=0.8
То есть только 20% ресурсов участвовали в раздаче ресурсов. Поменяв параметры, перезагрузили YARN. Проблема была решена, и остальные участники тоже смогли запустить spark context.
Поделиться с друзьями
Комментарии (7)
sshikov
29.04.2017 22:16Часто это происходит в тот момент, когда воркеры всё посчитали и отправляют результат на драйвер. Мы сделали себе этот параметр побольше. По умолчанию, он 1Гб, мы сделали — 3.
Мне казалось очевидным, что если вы ожидаете получение на драйвере скажем 10 гигабайт — то 1 ему не хватит? И трех кстати не хватит тоже. Это не проблема спарка, это фича, нет?
Ну т.е. совет какой-то, с одной стороны очевидный, а с другой — бесполезный. Вот если бы вы сказали, как оценить потребности и размеры результата?
a-pichugin
29.04.2017 22:45Да, есть некоторый workaround, на митапе он как раз обсуждался — что можно по частям передавать данные на драйвер, потому что памяти на драйвере и правда не напасешься порой.
Ermak
Т.е. вы используете дистрибутив HortonWorks, а почему не другие, например Cloudera или MapR?
a-pichugin
На предыдущих запусках использовали Cloudera. В этот раз просто переезжали с одного кластера на другой и решили попробовать HortonWorks. Не могу сказать, что заметил сильно принципиальные различия. Интерфейс другой :) Есть Tez.
Ermak
С нашими запросами Tez не справился, так что его наличие для нас не повод для радости. :(
couatl
в cloudera есть небольшая проблема, что из коробки старый спарк (1.6.0), старый хайв (1.1.0)
Ermak
Это да, проблема, старый спарк не умеет обрабатывать потоки из/в защищенную (secured) Кафку :(