Хабр, привет! Вчера на митапе, посвященном Apache Spark, от ребят из Rambler&Co, было довольно много вопросов от участников, связанных с конфигурированием этого инструмента. Решили по его следам поделиться своим опытом. Тема непростая — поэтому предлагаем делиться опытом тоже в комментариях, может быть, мы тоже что-то не так понимаем и используем.

Небольшая вводная — как мы используем Spark. У нас есть трёхмесячная программа “Специалист по большим данным”, и весь второй модуль наши участники работают на этом инструменте. Соответственно, наша задача, как организаторов, подготовить кластер под использование в рамках такого кейса.

Особенность нашего использования заключается в том, что количество человек, одновременно работающих на Spark, может быть равно всей группе. Например, на семинаре, когда все одновременно что-то пробуют и повторяют за нашим преподавателем. А это немного-немало — под 40 человек порой. Наверное, не так много компаний в мире, которые сталкиваются с таким сценарием использования.

Далее я расскажу, как и почему мы подбирали те или иные параметры конфига.

Начнём с самого начала. У Spark есть 3 варианта работать на кластере: standalone, с использованием Mesos и с использованием YARN. Мы решили выбрать третий вариант, потому что для нас он был логичен. У нас уже есть hadoop-кластер. Наши участники хорошо уже знакомы с его архитектурой. Давайте юзать YARN.

spark.master=yarn

Далее интереснее. У каждого из этих 3 вариантов развертывания есть 2 варианта деплоя: client и cluster. Исходя из документации и разных ссылок в интернете, можно сделать вывод, что client подходит для интерактивной работы — например, через jupyter notebook, а cluster больше подходит для production-решений. В нашем случае нас интересовала интерактивная работа, поэтому:

spark.deploy-mode=client

В общем-то с этого момента Spark уже будет как-то работать на YARN, но нам этого не было достаточно. Поскольку у нас программа про большие данные, то порой участникам не хватало того, что получалось в рамках равномерной нарезки ресурсов. И тут мы нашли интересную вещь — динамическую аллокацию ресурсов. Если коротко, то суть в следующем: если у вас тяжелая задача и кластер свободен (например, с утра), то при помощи этой опции Spark вам может выдать дополнительные ресурсы. Необходимость считается там по хитрой формуле. Вдаваться в подробности не будем — она неплохо работает.

spark.dynamicAllocation.enabled=true

Мы поставили этот параметр, и при запуске Spark ругнулся и не запустился. Правильно, потому что надо было читать документацию внимательнее. Там указано, что для того, чтобы все было ок, нужно еще включить дополнительный параметр.

spark.shuffle.service.enabled=true

Зачем он нужен? Когда наш джоб больше не требует такого количества ресурсов, то Spark должен вернуть их в общий пул. Самая трудозатратная стадия почти в любой MapReduce задаче — это стадия Shuffle. Этот параметр позволяет сохранять данные, которые образуются на этой стадии и соответственно освобождать executors. А executor — это процесс, который на воркере всё обсчитывает. У него есть какое-то количество процессорных ядер и какое-то количество памяти.

Добавили этот параметр. Всё вроде бы заработало. Стало заметно, что участникам реально стало выдаваться больше ресурсов, когда им было нужно. Но возникла другая проблема — в какой-то момент другие участники просыпались и тоже хотели использовать Spark, а там всё занято, и они были недовольны. Их можно понять. Стали смотреть в документацию. Там оказалось, что есть еще какое-то количество параметров, при помощи которых можно повлиять на процесс. Например, если executor находится в режиме ожидания — через какое время у него можно забрать ресурсы?

spark.dynamicAllocation.executorIdleTimeout=120s

В нашем случае — если ваши executors ничего не делают в течение двух минут, то, будьте добры, верните их в общий пул. Но и этого параметра не всегда хватало. Было видно, что человек уже давно ничего не делает, а ресурсы не освобождаются. Оказалось, что есть еще специальный параметр — по прошествии какого времени отбирать executors, которые содержат закэшированные данные. По дефолту этот параметр стоял — infinity! Мы его поправили.

spark.dynamicAllocation.cachedExecutorIdleTimeout=600s

То есть если в течение 5 минут ваши executors ничего не делают, отдайте-ка их в общий пул. В таком режиме скорость освобождения и выдачи ресурсов для большого количества пользователей стала достойной. Количество недовольства сократилось. Но мы решили пойти дальше и ограничить максимальное количество executors на один application — по сути на одного участника программы.

spark.dynamicAllocation.maxExecutors=19

Теперь, конечно, появились недовольные с другой стороны — “кластер простаивает, а у меня всего лишь 19 executors”, но что поделать — нужен какой-то правильный баланс. Всех сделать счастливыми не получится.

И еще одна небольшая история, связанная со спецификой нашего кейса. Как-то на практическое занятие опоздали несколько человек, и у них Spark почему-то не стартовал. Мы посмотрели на количество свободных ресурсов — вроде бы есть. Spark должен стартовать. Благо, что к тому моменту документация уже где-то записалась на подкорку, и мы вспомнили, что при запуске Spark ищет себе порт, на котором стартовать. Если первый порт из диапазона занят, то он переходит к следующему по порядку. Если он свободен, то захватывает. И есть параметр, который указывает на максимальное количество попыток для этого. По умолчанию — это 16. Число меньше, чем людей в нашей группе на занятии. Соответственно, после 16 попыток Spark бросал это дело и говорил, что не могу стартануть. Мы поправили этот параметр.

spark.port.maxRetries=50

Дальше расскажу о некоторых настройках, уже не сильно связанных со спецификой нашего кейса.

Для более быстрого старта Spark есть рекомендация папку jars, лежащую в домашней директории SPARK_HOME, заархивировать и положить на HDFS. Тогда он не будет тратить времени на загрузку этих джарников по воркерам.

spark.yarn.archive=hdfs:///tmp/spark-archive.zip

Также для более быстрой работы рекомендуется в качестве сериалайзера использовать kryo. Он более оптимизированный, чем тот, что по умолчанию.

spark.serializer=org.apache.spark.serializer.KryoSerializer

И есть еще давняя проблема Spark, что он часто валится по памяти. Часто это происходит в тот момент, когда воркеры всё посчитали и отправляют результат на драйвер. Мы сделали себе этот параметр побольше. По умолчанию, он 1Гб, мы сделали — 3.

spark.driver.maxResultSize=3072

И последнее, в качестве десерта. Как обновить Spark до версии 2.1 на HortonWorks дистрибутиве — HDP 2.5.3.0. Эта версия HDP содержит в себе предустановленную версию 2.0, но мы как-то однажды для себя решили, что Spark довольно активно развивается, и каждая новая версия фиксит какие-то баги плюс дает дополнительные возможности, в том числе и для python API, поэтому решили, что нужно делать апдейт.

Скачали версию с официального сайта под Hadoop 2.7. Разархивировали, закинули в папку с HDP. Поставили симлинки как надо. Запускаем — не стартует. Пишет очень непонятную ошибку.

java.lang.NoClassDefFoundError: com/sun/jersey/api/client/config/ClientConfig

Погуглив, выяснили, что Spark решил не ждать пока Hadoop разродится, и решили использовать новую версию jersey. Они сами там друг с другом ругаются на эту тему в JIRA. Решением было — скачать jersey версии 1.17.1. Закинуть это в папку jars в SPARK_HOME, снова сделать zip и закинуть на HDFS.

Эту ошибку мы обошли, но возникла новая и довольно-таки обтекаемая.

org.apache.spark.SparkException: Yarn application has already ended! It might have been killed or unable to launch application master

При этом пробуем запускать версию 2.0 — всё ок. Попробуй догадайся, в чем дело. Мы залезли в логи этого application и увидели что-то такое:

/usr/hdp/${hdp.version}/hadoop/lib/hadoop-lzo-0.6.0.${hdp.version}.jar

В общем, по каким-то причинам hdp.version не резолвилась. Погуглив, нашли решение. Нужно в Ambari зайти в настройки YARN и добавить там параметр в custom yarn-site:

hdp.version=2.5.3.0-37

Эта магия помогла, и Spark взлетел. Протестили несколько наших jupyter-ноутбуков. Всё работает. К первому занятию по Spark в субботу (уже завтра) мы готовы!

UPD. На занятии выяснилась еще одна проблема. В какой-то момент YARN перестал выдавать контейнеры для Spark. В YARN нужно было поправить параметр, который по дефолту стоял 0.2:

yarn.scheduler.capacity.maximum-am-resource-percent=0.8

То есть только 20% ресурсов участвовали в раздаче ресурсов. Поменяв параметры, перезагрузили YARN. Проблема была решена, и остальные участники тоже смогли запустить spark context.
Поделиться с друзьями
-->

Комментарии (7)


  1. Ermak
    28.04.2017 23:06

    Т.е. вы используете дистрибутив HortonWorks, а почему не другие, например Cloudera или MapR?


    1. a-pichugin
      28.04.2017 23:55

      На предыдущих запусках использовали Cloudera. В этот раз просто переезжали с одного кластера на другой и решили попробовать HortonWorks. Не могу сказать, что заметил сильно принципиальные различия. Интерфейс другой :) Есть Tez.


      1. Ermak
        30.04.2017 00:12

        С нашими запросами Tez не справился, так что его наличие для нас не повод для радости. :(


    1. couatl
      29.04.2017 15:37

      в cloudera есть небольшая проблема, что из коробки старый спарк (1.6.0), старый хайв (1.1.0)


      1. Ermak
        30.04.2017 00:15

        Это да, проблема, старый спарк не умеет обрабатывать потоки из/в защищенную (secured) Кафку :(


  1. sshikov
    29.04.2017 22:16

    Часто это происходит в тот момент, когда воркеры всё посчитали и отправляют результат на драйвер. Мы сделали себе этот параметр побольше. По умолчанию, он 1Гб, мы сделали — 3.

    Мне казалось очевидным, что если вы ожидаете получение на драйвере скажем 10 гигабайт — то 1 ему не хватит? И трех кстати не хватит тоже. Это не проблема спарка, это фича, нет?


    Ну т.е. совет какой-то, с одной стороны очевидный, а с другой — бесполезный. Вот если бы вы сказали, как оценить потребности и размеры результата?


    1. a-pichugin
      29.04.2017 22:45

      Да, есть некоторый workaround, на митапе он как раз обсуждался — что можно по частям передавать данные на драйвер, потому что памяти на драйвере и правда не напасешься порой.