Привет! Меня зовут Александр Ледовский, я тимлид команды аналитики и DS в Авито. Нередко я вижу ситуацию, когда аналитик работает над задачей и упирается в проблему производительности. Причём она всплывает там, где не ждали. В итоге задачи передвигаются из спринта в спринт, аналитик сидит на работе вечерами и в выходные, а также забирает ненормальное количество ядер на кластере, за что нарывается на справедливое недовольство коллег.

Есть много статей, которые дают инструкции по оптимизации Spark. Но всё-таки, оптимизация запросов — штука непростая. На мой взгляд, это не первый навык, который нужно освоить аналитику. Важнее как можно раньше научиться читать план запроса и следить за прогрессом исполнения расчётов в SparkUI. А контроль за прогрессом расчета — одна из важных особенностей Apache Spark, которой нет в обычных аналитических базах, вроде Greenplum или Clickhouse.

В этой статье мы подробно разберём:

  • что такое план запроса и чем он может помочь;

  • как Apache Spark формирует план запроса;

  • как пользоваться Spark UI, чтобы следить за планом запроса и прогрессом его исполнения;

  • практические советы.

Это уже третья статья в серии про работу с PySpark для аналитиков. Вы можете прочитать мои предыдущие статьи:

Что такое план запроса и зачем он вам нужен

Для начала я напомню, что Spark — «ленивый» фреймворк. Что это значит:

  • Когда вы пишете команды select, filter, join и делаете другие трансформации, ничего не происходит. Всё это время Spark запоминает ваши команды и строит план, как он их будет выполнять. План — это граф расчета.

  • Затем вы используете команду действия, которая должна выдать результат: count, toPandas или write. В этом момент Spark начинает исполнять план запроса и реально проводить вычисления.

  • Если вы два раза подряд вызовете команду count, Spark будет заново исполнять весь граф расчёта.

План запроса — это граф, который содержит операции внутри вашего запроса и порядок, в котором они должны выполняться

Вы также можете следить за тем, как выполняются отдельные кусочки этого графа с Spark UI.

Какие выводы вы, как аналитик, можете сделать после анализа графа расчета:

  • понять, в каком месте у вас медленно идет расчёт, чтобы адресно попросить помощи;

  • сохранить данные после неприятного расчёта (через .cache или через полноценную запись через write/saveAsTable);

  • если у вас чуть больше опыта, применить знания по оптимизации запросов.

Отдельно хочу отметить первый пункт. Когда вы приносите огромную витрину со словами «медленно работает», инженеру нужно самому запускать ваш код, разбираться в нём, смотреть план запроса. Вероятность, что он будет этим заниматься, сами понимаете, небольшая =) А когда вы задаёте адресный вопрос, вам помогут гораздо быстрее и с гораздо большей вероятностью.

Как Spark строит план запроса

Построение плана запроса в Spark состоит из нескольких этапов. Если хотите разобраться подробнее, посмотрите видео Physical Plans in Spark SQL (на английском): 

Часть 1 →
Часть 2 →

Тут я приведу только общие идеи:

  • Сперва формируется логический план, который получается напрямую из ваших команд. Но это не то, как всё будет работать под капотом.

  • После работы оптимизатора формируется физический план запроса. Он содержит конкретные операции, которые будут выполняться во время исполнения.

Чтобы вы чуть лучше поняли отличия, приведу пример. В вашем исходном плане вы:

  • читаете таблицу,

  • делаете filter по условию x > 10,

  • делаете джойн на другую табличку,

  • делаете select нужных колонок.

В финальном физическом плане оптимизатор:

  • прокинет выбор нужных колонок на уровень чтения файла с hdfs;

  • на этапе чтения добавит фильтр, чтобы поле, по которому вы делаете filter, не было null;

  • определит тип джойна (например, SortMergeJoin) и перед джойном сделает нужную подготовку (Shuffle и Sort).

Пример графа расчета витрины среднего размера, который я недавно оптимизировал

Что такое AdaptiveQueryExecution и WholeStageCodegen

С AdaptiveQueryExecution обычно не возникает много вопросов. Этот алгоритм «на лету» меняет план запроса. Он использует данные, которые получил в процессе исполнения. AdaptiveQueryExecution стал работать по умолчанию с версии 3.2. Считается, что он улучшает скорость расчетов, хотя на моей практике встречались случаи, когда он ошибался. Тогда улучшить производительность помогало выключение этой опции. Но ничего хитрого для аналитика в нём нет и по умолчанию его лучше всегда использовать. Теперь, когда увидите значки AQE в плане запроса, поймёте, почему они появились.

С WholeStageCodegen вы будете постоянно сталкиваться, когда будете читать план запроса и это может вас смутить. Это достаточно сложная механика и понимать, как она работает, не нужно. Просто знайте, что она есть. Если коротко, то кусочки кода, которые вы написали (например, условия в filter), Spark может скомпилировать в бинарный код Java, чтобы запрос быстрее работал.

Теоретически нужно стараться писать код так, чтобы как можно больше трансформаций попадали в WholeStageCodegen блоки. Но лично я плохо понимаю, как на это влиять, и какой процент ускорения потенциально можно получить. Если имеете опыт по этому вопросу, буду благодарен, если поделитесь в комментариях. Подробнее про работу оптимизатора и кодогенерацию можно почитать в статье на Databricks →

Как смотреть план запроса в SparkUI и почему я не использую explain

Многие, кто работал в классических базах данных, привыкли к оператору explain. Он позволяет в текстовом виде вывести план запроса. В Spark тоже есть такая возможность:

df.explain()
# есть разные опции как форматировать вывод
df.explain(mode='formatted')

Моя краткая рекомендация — не используйте этот метод. Дело в том, что в Spark планы запроса варьируются от больших до огромных. Читать вывод эксплейна становится просто невозможно. Я рекомендую пользоваться SparkUI.

Многие начинают работать со SparkUI с вкладки Jobs и не могут понять, почему на один их write или toPandas возникает много джобов. Поэтому сначала давайте посмотрим на вкладку SQL. В ней вкладке отображаются физические планы запросов в виде графа:

Совет. Когда вы оптимизируете ваш расчёт, не обязательно смотреть план по всему запросу сразу. Берите промежуточные датафреймы и запускайте на них count. Тогда план небольшого запроса попадет во вкладку SQL, где вы сможете его спокойно изучать.

Во-первых, стоит посмотреть, на какие этапы тратится больше всего времени. На блоках вы увидите разные показатели времени (общие и на некоторых детальные), которые содержат цифры total, min, med, max. Это статистика по задачам, на которые разбивается блок. Нужно понимать, что ни одна из этих цифр не отражает реальное время работы блока.

Что означают показатели:

  • total — суммарное время расчета всех тасок. Например, у вас было 10 параллельных тасок, каждая считалась 10 секунд. Общее физическое время расчёта также будет 10 секунд. При этом total будет равен 100 секундам.

  • med — медианное время расчета таски.

  • min и max — минимальное и максимальное время расчёта тасок.

На первый взгляд кажется странным ориентироваться на суммарное, а не на реальное время. Однако параллельно могут считаться несколько блоков. Например, вам бы написали, что блок считался минуту, а на самом деле в эту минуту 90% ресурсов было съедено другим блоком. Если бы наш блок считался один, то он посчитался бы за 6 секунд.

Поэтому реальное время расчета блока в общем случае непоказательно

В итоге посмотрите, на каких блоках total больше всего — там и проблема. Также обратите внимание, где max сильно больше med — там, скорее всего, перекос в данных. В примере ниже минимальное время расчета — 0, что означает, что есть пустые партиции. При этом максимальное время больше медианного более чем в 10 раз! Но из-за 15 секунд, конечно, заморачиваться оптимизацией не стоит, это просто пример =)

Также стоит проверить, что у вас всё в порядке с чтением данных. Нужное ли, например, количество партиций вы читаете? Адекватное ли при чтении количество строк?

Как использовать вкладки Jobs и Stages, чтобы уточнить узкое место в расчете

Когда у вас большой граф расчёта параметров total, min, med, max может быть недостаточно. Тогда есть возможность провалиться в реальные таски и посмотреть, что происходит внутри, с помощью вкладок Jobs и Stages.

SQL-запрос состоит из блоков, блоки раскладываются в джобы (Jobs), джобы раскладываются в этапы (Stages), этапы состоят из параллельных задач (Tasks)

Однако прямого соответствия блоков SQL-запроса на джобы в интерфейсе вы не найдёте. Для того, чтобы это сделать, нужно воспользоваться неявным методом.

В вашем SQL-запросе вы можете точно определить, какие джобы ему соответствуют:

Затем вы можете перейти в джобу. Ориентируясь на номер WholeStageCodegen блоков, можно сопоставить, где на плане SQL-запроса находится эта джоба и соответствующие этапы. Кстати, если знаете более удобный способ, расскажите, пожалуйста, в комментариях!

Затем вы можете провалиться в конкретный стейдж и посмотреть таски и экзекьюторы, которые в них участвуют:

Вы можете отсортировать таски по наиболее долгим, залезть в логи stderr, где лежат все нужные логи спарка, и попробовать разобраться, что происходит.

Иногда проблема кроется в том, что у вас неэффективный код, а иногда какая-то нода не коннектится к HDFS и таска висит «просто так». Чтение логов — достаточно объёмная тема, в этой статье мы её разбирать не будем. Но вы можете:

  • погуглить логи, на которых зависает расчёт;

  • пойти за консультацией по оптимизации запроса к вашим дата-инженерам;

  • сохранить промежуточный результат перед проблемным местом в HDFS и начать расчёты с него, чтобы отладка была удобнее.

Рекомендации

  • Возьмите в привычку открывать план запроса в Spark UI, когда видите, что ваши расчёты идут медленно.

  • Используйте показатели total, min, med, max, чтобы вычислять проблемные места.

  • Через WholeStageCodegen переходите на вкладки Jobs и Stages, чтобы углубиться в анализ исполнения запроса.

Немного подробнее обо мне

Занимаюсь в Авито разработкой алгоритмов продвижения, в том числе аукционных механик и автобиддинга. Хотя в Авито основной DWH построен на Vertica и ClickHouse, наша аналитика в основном работает на Spark. Он позволяет эффективно обрабатывать поисковые логи со сложной вложенной структурой. До Авито я работал в Сбере, где построил дата-команду трайба «Малый и микробизнес» со своим промышленным Hadoop-кластером.

Периодически я делюсь своими инсайтами и впечатлениями в своём телеграм-канале https://t.me/big_ledovsky. Буду рад ответить на вопросы по статье и вообще обсудить Apache Spark и алгоритмы анализа данных.

Спасибо за внимание!

Предыдущая статья: Запуск потенциально опасного кода в изолированном окружении. Как мы сохраняем безопасность на macOS

Комментарии (0)