Привет! Меня зовут Александр Ледовский. Я тимлид команды аналитики и DS, мы строим рекламные аукционы в Авито. Для работы с поисковыми логами мы пользуемся Apache Spark. Я расскажу о моём опыте работы с ним и выделю то, что нужно знать.

Эта статья будет полезна аналитикам, дата-инженерам и специалистам по обработке больших данных, а также тимлидам команд, которые работают с Apache Spark для решения задач аналитики и машинного обучения. Вы узнаете, как правильно задавать параметры Spark-сессии, чтобы получить ресурсы.

Я часто вижу ситуации, когда на Hadoop-кластерах не хватает ресурсов. Некоторые аналитики в это время просят огромное количество ядер и памяти для своих Jupyter-ноутбуков. А другие люди вообще не могут получить ресурсы. Это очень тормозит рабочие процессы, и вообще так быть не должно.

Аналитику нужно не только учиться писать код на Spark. Важно уметь правильно инициализировать сессию и запрашивать ресурсы. Для этого нужно знать:

  • на какие процессы Spark запрашивает ресурсы;

  • чем хороша динамическая аллокация;

  • какие параметры нужно настраивать у драйвера и экзекьюторов, а какие — нет;

  • как определить необходимое количество ресурсов.

Сценарий создания Spark-сессии аналитиком

Вы — аналитик, работаете в Jupyter-ноутбуке на Hadoop-кластере вашей компании. На кластере работают другие аналитики, считаются регулярные задачи.

Вы запускаете ноутбук и стартуете Spark-сессию. В ней указываете параметры, связанные с ресурсами. Например:

from pyspark.sql import SparkSession

app_name = 'Your App'

conf = {
    'spark.dynamicAllocation.enabled': 'true',
	'spark.shuffle.service.enabled': 'true',
    'spark.dynamicAllocation.maxExecutors': 10,
    'spark.executor.memory': '32g',
    'spark.executor.cores': '4',
    'spark.driver.memory': '4g',

	# какие-то другие параметры
}

builder = (
    SparkSession
    .builder
    .appName(app_name)
)

for k, v in conf.items():
    builder.config(k, v)

spark = builder.getOrCreate()

Здесь не упомянуты все параметры, на самом деле их несколько десятков. Для новичка может быть сложным понять, как они работают. Но есть и хорошая новость: параметров, которые точно нужно знать, не так много.

Примечание. Здесь и далее я опираюсь на версию 3.3, которая у нас установлена. Но все сказанное актуально и для более ранних версий, по крайней мере 2.4.

Как Spark запрашивает ресурсы

Управление ресурсами для Spark происходит так:

  1. Приложение Spark работает в виде процессов двух типов: драйвер и экзекьютор. Драйвер всегда один, экзекьюторов много.

  2. У драйвера и экзекьюторов нужно управлять ядрами и памятью, есть тонкие настройки.

  3. Количеством экзекьюторов тоже нужно управлять.

Spark написан на Scala и немного на Java. Поэтому сам по себе он — это процессы, работающие в JVM (Java Virtual Machine). Но когда мы работаем на pySpark, к ним добавляются процессы-компаньоны на Python. Python используется драйвером как интерфейс для вызова функций, а экзекьюторы — для выполнения user-defined functions на Python.

Архитектура PySpark
Архитектура PySpark

Подробнее об архитектуре PySpark →

Параметры драйвера

Драйвер в Spark выполняет две основных задачи: планирование расчётов и сбор результатов. Для планирования расчётов дополнительные ресурсы не нужны. А вот сбор результатов стоит рассмотреть подробнее.

В pySpark результаты собирает функция toPandas().

Важно: toPandas не создан, чтобы выкачивать большие датасеты. Конечно, его можно ускорить с помощью Apache Arrow. Но документация не рекомендует так делать:

This method should only be used if the resulting Pandas pandas.DataFrame is expected to be small, as all the data is loaded into the driver’s memory.

При этом непонятно, что такое большой датасет. По умолчанию я не рекомендую делать драйвер больше стандартного. А если вам нужно выкачать много данных (больше нескольких Гб) — есть другие способы, которые стоит освоить. О toPandas и его альтернативах у меня будет еще одна статья.

Драйвер в Spark можно разворачивать двумя способами: в режиме cluster и client. cluster используется для промышленных расчетов. Ресурсы драйвера выделяются ресурсным менеджером YARN, вы можете контролировать все настройки. 

В pySpark ноутбуках используется режим client. Это значит, что драйвер запускается на сервере рядом с вашим ноутбуком, а часть параметров его настройки вам недоступна.

При работе с Python-драйвером важно помнить, что лимитами ресурсов невозможно управлять. Если вы выполняете большой запрос toPandas и запускаете тяжелую локальную ML-модель, это может занять все ресурсы сервера и повлиять на работу других процессов в кластере. Чтобы избежать такой ситуации, в Сбере мы создали специальные Jupyter-ноды, которые были размещены отдельно от кластера. Это помогало изолировать нагрузку ноутбуков и предотвращать негативное влияние на Hadoop. У вас может быть не так, поэтому контролируйте потребление ресурсов.

Параметрами контекста мы влияем только на JVM-процесс:

spark.driver.cores

Этот параметр не используется при deploy-mode=client. Задавать не нужно.

spark.driver.memory

Память драйвера. Если toPandas будет падать с ошибкой OutOfMemory, ограничение будет в этом параметре. Дефолт — 1 Гб. Если не используете toPandas для больших датафреймов, менять не нужно.

spark.driver.maxResultSize

Объём данных, который драйвер может собирать. В том числе и toPandas-ом. По умолчанию менять не нужно. Если вы изменили spark.driver.memory, нужно поднимать maxResultSize до эквивалентных значений.

spark.driver.memoryOverhead

Этот параметр менять не нужно. При deploy-mode=client на Python эта память не распространяется. При ошибках спарк даёт совет поменять memoryOverhead — это вводит в заблуждение. Менять нужно параметр memory. memoryOverhead выделяется как 10% от memory и этого должно быть достаточно.

Динамическая аллокация и почему её нужно использовать

В Spark существует два типа аллокаций ресурсов — статическая и динамическая. В ноутбуках всегда рекомендуется использование динамической.

В статической аллокации вы при старте контекста задаёте количество экзекьюторов. Они закрепляются за вами, пока приложение будет работать.

А теперь представим рабочий день аналитика. Вы пришли на работу рано, подняли ноутбук. Ресурсов попросили от души, кластер же свободный. Сделали пару запросов, удовлетворились, пошли завтракать, а потом на встречи. Про ноутбук, конечно, забыли: заняли ресурсы и не используете. Другие аналитики в это время не могут их получить.

Динамическая аллокация решает эту проблему. Если вы какое-то время не пользуетесь ноутбуком, ресурсы срезаются до минимума.

Нужно понимать, что изначально динамическая аллокация придумывалась для оптимизации регулярных расчетов. Теоретически, Spark в процессе расчёта то увеличивает, то уменьшает количество выделенных экзекьюторов. Есть много параметров для настройки этого процесса.

Но этот функционал для нас избыточен. Рассчитывайте, что во время работы вам выделяется максимум, когда уходите завтракать — отнимается до минимума.

Почему это так. Вы запускаете задачу. Spark с экспоненциальной скоростью выделяет новые экзекьюторы, пока есть таски в ожидании. Либо пока не упрётся в лимит. Чтение таблицы создает столько тасок, сколько вы читаете файлов. А shuffle, возникающий при джойне, изменяет количество партиций до параметра spark.sql.shuffle.partitions, который по умолчанию 200. Получается, любые более-менее боевые запросы быстро отъдедят у вас максимум экзекьюторов.

Параметры для настройки динамической аллокации:

spark.dynamicAllocation.enabled
true

spark.shuffle.service.enabled или spark.dynamicAllocation.shuffleTracking.enabled
true
.

Так просит документация. Не буду вдаваться в подробности, так как этот функционал нам особо не нужен. Нужно указать один из двух. Пробуйте первый параметр. Если будут ошибки, переключайтесь на второй. Если что, ваши дата-инженеры вам подскажут.

spark.dynamicAllocation.maxExecutors
Самый важный параметр. Определяет максимальное, рабочее количество экзекьюторов.

spark.dynamicAllocation.minExecutors
Я рекомендую оставлять дефолтный 0, и даже прописывать его явно. Если очень нужно, оставьте немного несгораемого ресурса на одном из ноутбуков. Но точно не на каждом.

spark.dynamicAllocation.initialExecutors
Не заполняйте. По умолчанию будет равен spark.dynamicAllocation.minExecutors

spark.dynamicAllocation.executorIdleTimeout
Время, спустя которое забираются неиспользуемые экзекьюторы. Рекомендую прописывать явно. Дефолтные 60 секунд на мой взгляд — нормально.

spark.dynamicAllocation.cachedExecutorIdleTimeout
Время, которое живут экзекьюторы с кешем. Если не выставить, по умолчанию будет значение бесконечность. Но администраторы вашего кластера могли изменить это значение. Хорошие аналитики активно кешируют данные в своих ноутбуках, поэтому считайте, что динамическая аллокация работать не будет. На мой взгляд, адекватное время — от 10 до 30 минут, в зависимости от спроса на ресурсы на кластере. Задаётся в секундах.

Параметры экзекьютора

Настройка экзекьюторов — самое сложное. Она больше всего влияет на жизнеспособность кластера. Аналитики часто копипастят конфиги друг у друга, и в итоге на кластере заканчиваются ресурсы.

Ситуация ухудшается тем, что не существует четкого алгоритма, как настраивать экзекьюторы. Зато есть ряд рекомендаций. Сначала определитесь с размером экзекьюторов, затем настраивайте их количество.

Соотношение ядер и памяти

При определении размера экзекьютора необходимо определить баланс ядер и памяти. Аналитики часто забывают, что ресурсы выделяются на реальных серверах с конкретным соотношением ядер и оперативной памяти для YARN. Его стоит соблюдать, чтобы не получилось так, что на кластере израсходовали все ядра, а память осталась свободной.

Чтобы понять соотношение, зайдите в YARN → Scheduler. Посмотрите общий объём ядер и памяти, поделите. Вы получите соотношение, например, 1 ядро к 7 Гб памяти. Память экзекьютора состоит из основной памяти и оверхеда. Оверхед по умолчанию равен 10%. Тогда в этом примере на 1 ядро нужно выделять 6 Гб основной памяти.

Определение размера экзекьютора

Плюсы больших экзекьюторов:

  1. Чем больше экзекьютор, тем меньше будет межпроцессного и межсетевого трафика.

  2. Чем больше экзекьютор по памяти, тем большего размера партицию он сможет обработать и не упасть.

Минусы:

  1. Большие экзекьюторы сложно выделяются. Если кластер сильно загружен, ярну придётся поискать свободные места на нодах, чтобы выделить большие экзекьюторы.

  2. На экзекьюторах с памятью больше 64Гб неэффективно происходит очистка памяти (garbage collection).

Эксперты считают, что оптимальный экзекьютор — с примерно 5 ядрами и соответствующим объёмом памяти.

Если не знаете зачем, не выставляйте memoryOverhead вручную. Он используется JVM-процессом для хранения определенных объектов. А ещё на pySpark-код, исполняемый на экзекьюторах. Обычно дефолтного значения в 10% достаточно. Стоит увеличивать этот параметр, только если ловите ошибки по памяти. Согласно одному из гайдов — до 25%.

Определение количества экзекьюторов

Чтобы его определить, нужно отталкиваться от общего объёма ресурсов.

По объёму памяти — берите память в 5-10 раз больше, чем объём читаемых данных. Скорее всего, вам этого хватит. С меньшим объемом памяти тоже можно работать, но возможны ошибки.

По ядрам все просто: чем их больше, тем быстрее идут расчёты. Технических ограничений тут нет. Какая скорость приемлемая — вопрос сложный. Тут нужно договариваться и выстраивать стратегию.

Общая стратегия управления ресурсами, или сколько вам нужно ядер

В начале работы над задачей неизвестно, сколько у вас будет данных. Часто аналитики по ходу исследования придумывают запросы.

Я предлагаю такую стратегию по управлению ресурсами:

  1. В вашей команде должен быть установлен минимальный конфиг. Если аналитики будут вести под ним большую часть работы, то 2/3 кластера будут свободны.

  2. Если у вас появилась вычислительно сложная задача, перезапустите сессию с большим количеством ресурсов. Если будут злоупотребления — установите лимит.

  3. Придерживайтесь хорошего тона и берите ресурсы, когда они действительно нужны. Например, на большой расчет. Разрабатывать витрину можно и на небольшом семпле данных.

Где в документации описаны параметры

От аналитика не предполагается, что он будет разбираться во внутренностях работы Spark. Если возникнет необходимость, искать нужно в первую очередь в документации:

Все ссылки приведены на latest-версию. Пользуйтесь документацией той версии, которая стоит у вас на кластере.

Рекомендации

Я рассказал про принципы для контроля выделенных ресурсов на PySpark ноутбуках. Подведём итоги: 

  • используйте динамическую аллокацию;

  • не пишите лишних параметров в конфиг;

  • явно прописывайте все нужные параметры в конфиг;

  • используйте соотношение ядер и памяти, соответствующее вашим серверам;

  • используйте экзекьюторы примерно по пять ядер;

  • берите в 5-10 раз больше памяти, чем занимают ваши данные;

  • договоритесь в команде о параметрах минимального конфига.

Если при этом ваши ноутбуки будут падать по памяти, не стоит сразу бежать увеличивать ресурсы. Обратите внимание на оптимизацию кода. О ней я расскажу в следующих статьях.

Послесловие: взгляд дата-инженера

Идея написать этот раздел появилась после обсуждения статьи с нашим дата-инженером, Иваном Ахлестиным. Ваня отвечает за всю инфраструктуру Hadoop в Авито.

Я писал эту статью с точки зрения тимлида команды аналитиков. Мне важно, чтобы команда работала продуктивно. А на Spark уходит много времени.

Аналитик хочет получить как можно больше ресурсов, чтобы посчитать свою задачу. Даже если это кому-то помешает. По крайней мере, пока не придут ругаться коллеги =)

Дата-инженеру важно сохранить кластер в работоспособном состоянии, чтобы все люди могли на нем работать. Для этого ему нужно создать на кластере конфиг по умолчанию и следить, чтобы аналитики его не изменяли без необходимости. Хорошо, если многие параметры из этой статьи, аналитику не нужно будет трогать. Максимум, количество экзекьюторов и иногда их размер.

Немного обо мне

Я занимаюсь разработкой аукционных механик в компании Авито, в том числе автобиддингом. Этот функционал используется в алгоритмах продвижения. Для обработки сложных вложенных структур поисковых логов мы в основном используем Spark, хотя основной DWH в Авито построен на Vertica и ClickHouse. До этого я работал в Сбере, где создал дата-команду для трайба малого и микробизнеса с собственным промышленным Hadoop-кластером.

Периодически я делюсь своими инсайтами и впечатлениями в своём телеграм-канале https://t.me/big_ledovsky. Буду рад ответить на вопросы по статье и вообще обсудить Apache Spark и алгоритмы анализа данных.

Предыдущая статья: Ультимативный гайд по HTTP. Cookies и CORS

Комментарии (14)