Публикуем перевод гайда по Spark UI. Это встроенный инструмент Apache Spark, который предоставляет полный обзор среды Spark: узлов, исполнителей, свойств и параметров среды, выполняемых заданий, планов запросов и многого другого. Кроме теории в статье вы найдёте несколько примеров, которые помогут попрактиковаться в отслеживании и анализе заданий Spark.

Первый взгляд на Spark UI

По умолчанию доступ к интерфейсу Spark UI осуществляется в режиме разработки по адресу localhost:4040. Давайте быстро начнем новый проект, запустим сессию Spark, выполним простое задание и посмотрим на интерфейс Spark UI:

from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, collect_list

import time


def create_spark_session() -> SparkSession:
    conf = SparkConf().set("spark.driver.memory", "8g")

    spark_session = SparkSession\
        .builder\
        .master("local[4]")\
        .config(conf=conf)\
        .appName("Spark UI Tutorial") \
        .getOrCreate()

    return spark_session


if __name__ == '__main__':

    spark = create_spark_session()

    test_df = spark.createDataFrame([
        (1, 'a'),
        (2, 'b'),
        (3, 'c'),
        (4, 'd'),
        (5, 'e'),
        (6, 'f'),
        (7, 'g'),
        (8, 'h'),
        (9, 'i'),
        (10, 'j')
    ], ["number", "letter"]).cache()

    test_df.show(truncate=False)

    test_df \
        .withColumn("mod", col("number") % 2) \
        .groupBy("mod") \
        .agg(collect_list("letter").alias("letter")) \
        .show(truncate=False)

    # For UI to stick
    time.sleep(1000000)

Я запускаю эту программу в PyCharm IDE. Я добавил файл requirements.txt, в котором также имеется только одна зависимость:

pyspark==3.4.0

Spark Session создаст сессию Spark в локальном режиме, а конфигурация local[4] означает, что будут использоваться 4 ядра. В локальном режиме задачи выполняются на driver node.

Выполнение этого кода приведет к созданию простого фрейма данных в памяти, а затем к выполнению над ним операции group by перед выводом на экран. Созданный фрейм данных также кэшируется. После выполнения этого простого задания функция time.sleep(100000) сохраняет сессию Spark в течение длительного времени, чтобы можно было изучить пользовательский интерфейс Spark.

Чтобы получить доступ к Spark UI, во время выполнения кода откройте браузер и перейдите на localhost:4040, который является портом по умолчанию для Spark UI. Рассмотрим пользовательский интерфейс Spark подробнее.

Jobs View

Spark UI Job View
Spark UI Job View

Примечание: Если порт 4040 занят, в журнале выполнения задания будет указано, что оно запускается с порта 4041 (или любого другого порта, доступного после 4040), поэтому откройте его.

Выбор другого порта, поскольку 4040 недоступен
Выбор другого порта, поскольку 4040 недоступен

При открытии пользовательского интерфейса Spark UI отображаются все задания, как завершенные, так и находящиеся в процессе выполнения. Время выполнения нашего простого задания очень мало — всего 10 секунд, поэтому видны только завершенные задания и их этапы. Если бы были более длительные задания, то Spark UI показывал бы и их. Для каждого задания мы можем видеть количество этапов и время, которое оно заняло. Кроме того, в пользовательском интерфейсе отображается количество заданий: успешных, неудачных (если таковые имеются) и незавершенных (если таковые имеются).

Временную шкалу событий можно просмотреть, развернув заголовок Event Timeline heading (Временная шкала событий):

Event Timeline
Event Timeline

Временная шкала событий отображает в хронологическом порядке события, связанные с исполнителями (добавленные, удаленные) и заданиями. Подрбнее можно почитать в документации

Так выглядит пользовательский интерфейс Spark в режиме разработки, однако в режиме кластера/клиента на Home page перечислены все отправленные задания. Если выбрать одно из них, то откроется подробное представление.

Stages View

Щелкните на Stages в верхнем меню, чтобы открыть страницу stages:

В режиме просмотра этапов отображается дополнительная информация о размере входных данных для каждого этапа, размере данных для чтения и записи в шаффле, количестве задач. Кроме того, указываются пропущенные этапы. Выберите этап 2 для отображения подробной информации об этом этапе.

Сверху находится заголовок DAG Visualization, щелкнув по которому можно раскрыть DAG-визуализацию этапа:

DAG-визуализация stage
DAG-визуализация stage

Визуализация DAG показывает разбивку каждого этапа на шаги. Под визуализацией DAG находится график событий (Event Timeline), который показывает время, затраченное на выполнение каждой задачи, или время, затраченное на выполнение текущих задач. График Event Timeline также свернут по умолчанию, и его необходимо развернуть.

Event timeline
Event timeline

График временной шкалы событий имеет общую временную шкалу на оси x, которая дает общее представление о времени начала, окончания и продолжительности выполнения всех задач в сравнении друг с другом.

Если на этапе имеется больше задач, чем количество задач, которые могут быть выполнены одновременно, то некоторые задачи могут быть запущены после освобождения мощностей. Все это можно проследить на временной шкале. Временная шкала каждой задачи разбивается на время вычислений и время, затрачиваемое на выполнение других административных задач, таких как сериализация/де-сериализация, чтение/запись с перестановкой и т.д.

Вот как выглядит временная шкала для более крупного задания с большим количеством задач:

Напряженный график — множество задач у нескольких исполнителей
Напряженный график — множество задач у нескольких исполнителей

Под временной шкалой находится сводная статистика, показывающая длительность (Duration), время уборки мусора (Garbage Collection time), время ввода и перетасовки (input and shuffle times), а также некоторые другие моменты времени:

Разбивка каждого временного интервала производится по минимальному, максимальному и 25-му, среднему и 75-му процентилям. Эта таблица очень удобна для выявления несовпадающих задач и перекоса данных. В идеале все длительности и объемы данных должны быть близки к равномерным для всех задач, т.е. как можно ближе к медиане.

Несколько задач, занимающих очень много времени, могут свидетельствовать о перекосе данных или о том, что обработка занимает много времени по другой причине, например, из-за сложности вычислений.

Далее приводится таблица с подробным описанием каждой задачи:

Детали задачи
Детали задачи

Здесь можно получить такую информацию, как время выполнения каждой задачи, на каком узле она выполнялась, время GC, размер входных данных и размер шаффла. Также, если есть возможность, приведены ссылки на консольные выходы каждой задачи, как stderr, так и stdout, которые можно использовать для оценки результатов.

SQL/DataFrame View

Доступ к представлению SQL/DataFrame можно получить, выбрав кнопку SQL/DataFrame на верхней панели. Она является последней в верхнем меню.

SQL-запросы задания
SQL-запросы задания

В main view отображается каждый запрос в общем задании, его длительность, а также идентификаторы заданий (Job IDs), связанные с каждым запросом. Для начала выделите запрос с идентификатором 0, чтобы получить подробную информацию о нем:

DAG-представление первого запроса
DAG-представление первого запроса

Это представление SQL-запроса для первого действия:

Первое действие задания — показ необработанного DataFrame
Первое действие задания — показ необработанного DataFrame

Ознакомимся с шагами.

Во-первых, есть шаг Scan Existing RDD, который создает DataFrame путем чтения из списка значений. Количество выводимых строк равно 10. Далее DataFrame кэшируется и считывается из памяти шагом in-memory Table Scan. Но поскольку это первое действие, то перед кэшированием DataFrame должен быть построен/считан, и поэтому первый шаг выполняется, а не пропускается. Далее следуют шаги проецирования и сбора (Project и Collect Limit), которые выбирают и отображают DataFrame.

Раскройте заголовок Details, чтобы увидеть план запроса. Physical plan имеет следующий вид:

== Physical Plan ==
CollectLimit (5)
+- * Project (4)
   +- InMemoryTableScan (1)
         +- InMemoryRelation (2)
               +- * Scan ExistingRDD (3)

Затем откройте второй SQL-запрос с предыдущей страницы:

DAG-представление второго запроса
DAG-представление второго запроса

Это более длинный запрос, поскольку для получения конечного результата требуется вычислить производный столбец, сгруппировать по нему данные, а затем агрегировать столбец "letter".

Но главное, на чем я хочу заострить внимание, находится в самом начале запроса. Шаг Scan Existing RDD не возвращает ни одной строки, и это потому, что данные для этого шага могут быть повторно использованы из кэша, а кэш был заполнен в предыдущем запросе.

Но поскольку операция кэширования не обрезает строку данных, и возможно (из-за потери данных или удаления узла в публичной облачной среде), некоторые части кэшированных данных придется перестраивать, и если это произойдет, то это также будет видно в плане запроса.

Однако если вместо кэша используется контрольная точка (checkpoint), то добавляется еще один SQL-план, поскольку Checkpoint — это действие:

Запрос, используется ли контрольная точка вместо кэша
Запрос, используется ли контрольная точка вместо кэша

План запросов задачи контрольной точки прост:

Пошаговый план контрольных точек
Пошаговый план контрольных точек

Обе последующие задачи теперь читают данные из контрольной точки, а не из источника:

DAG-представление первого запроса — чтение данных из контрольной точки
DAG-представление первого запроса — чтение данных из контрольной точки
DAG-представление второго запроса - чтение данных из контрольной точки
DAG-представление второго запроса - чтение данных из контрольной точки

Если данные контрольной точки будут потеряны, то восстановить их будет невозможно, так как линия данных будет обрезана, и задание завершится неудачно.

Storage View

Говоря о кэше, следует отметить, что в Storage view (выберите Storage в верхнем меню) хранятся все сохраняемые данные. В данном случае это DataFrame, который был кэширован:

Cached Data in Storage View
Cached Data in Storage View

Кроме того, отображается размер сохраняемых данных в памяти и на диске. По каждому сохраненному элементу можно щелкнуть мышью, чтобы открыть подробности:

Подробности о кэшированных данных
Подробности о кэшированных данных

Environment view

Наконец, в представлении среды отображается сконфигурированная среда:

Среда выполнения Spark
Среда выполнения Spark

Executors view

В представлении "Executors" отображаются все исполнители, а также сводная разбивка. В этом представлен: количество активных, завершенных и неудачных задач для каждого экземпляра исполнителя, а также время сборки мусора (GC), размеры входных и перемешиваемых данных и, при наличии, журналы каждого исполнителя. Доступ к нему можно получить, выбрав кнопку Executors на верхней панели навигации:

Executors view. Из-за локального режима присутствует только Driver
Executors view. Из-за локального режима присутствует только Driver

Практика на двухузловом кластере, запущенном на docker

Теперь рассмотрим более близкую к реальности ситуацию: многоузловой кластер и более длительное и сложное задание.

Для запуска двухузлового кластера Spark с помощью Docker Compose я нашел отличный источник. В этой статье подробно рассматривается создание многоузлового кластера Spark с помощью Docker и Docker Compose. Здесь находится соответствующий репозиторий кода.

Однако я форкнул это репо и внес следующие изменения:

  • Обновлена версия Spark до 3.4.0 (статья написана до выхода 3.5).

  • Удален последний контейнер базы данных.

  • Изменен порт веб-интерфейса Spark на 9000.

  • Открыт порт 4040 на мастере.

  • Некоторые незначительные изменения портов и другие изменения.

  • Увеличена память рабочего процессора до 2 ГБ и рабочие ядра до 2.

Обновленный репозиторий, на который мы будем ссылаться в этой статье, можно получить по этой ссылке.

Кластер можно собрать, сначала создав файл docker, а затем запустив docker compose up:

docker build -t spark-docker-image-3.4.0 .
docker compose up
Запуск 2-узлового кластера Spark с помощью docker-compose
Запуск 2-узлового кластера Spark с помощью docker-compose

При этом создается кластер с 1 ведущим и 2 рабочими узлами:

Узлы кластера успешно запущены
Узлы кластера успешно запущены

Доступ к веб-интерфейсу Spark Master можно получить через порт 9000. Это не пользовательский интерфейс Spark для задания, а список всех заданий, которые были выполнены или находятся в процессе выполнения. Доступ к Spark UI выполняющегося задания можно получить по адресу localhost:4040 или по ссылке из этого списка.

Spark Master Web UI — отображение всех рабочих узлов и заданий, поданных на кластер
Spark Master Web UI — отображение всех рабочих узлов и заданий, поданных на кластер

Обратите внимание на IP-адреса рабочих узлов. На моем компьютере рабочий 1 — это 172.20.0.3, а рабочий 2 — 172.20.0.4. Это необходимо для доступа к журналам исполнителей и задач на рабочих узлах.

Кроме того, Spark master имеет адрес spark://c44e0ffa8c82:7077, который будет передан команде spark-submit.

Подготовка задания к выполнению и демонстрации

Задание для демонстрации выполняет следующие действия:

  • Создает DataFrame с двумя столбцами: c цифрами и буквами.

  • Каждое число сопоставляется с буквой, и это сопоставление может быть изменено для получения перекоса данных. Чем больше число, передаваемое в функцию get_alphabet_skewed, тем больше перекос букв x, y и z.

  • Функция, определяемая пользователем (UDF), которая повторяет переданную букву 4 раза. Она также может не сработать (один раз из 500 000 раз), если параметр имеет значение True.

Код задания приведен ниже:

from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, lit, udf

import time, random

from pyspark.sql.types import StringType


def create_spark_session(is_local: bool) -> SparkSession:
    if is_local:
        print("Stating for local mode")
        conf = SparkConf().set("spark.driver.memory", "8g")

        spark_session = SparkSession\
            .builder\
            .master("local[4]")\
            .config(conf=conf)\
            .appName("Spark UI Tutorial") \
            .getOrCreate()

        spark_session.sparkContext.setCheckpointDir("checkpoint")

        return spark_session

    else:
        print("Stating for cluster mode")
        return SparkSession\
            .builder\
            .appName("Spark UI Tutorial")\
            .getOrCreate()


def get_alphabet_skewed(number: int):
    if number > 25:
        if number % 3 == 0:
            return "x"
        if number % 3 == 1:
            return "y"
        else:
            return "z"
    else:
        return chr(97 + number)


@udf(returnType=StringType())
def repeat_letter(letter: str, should_randomly_fail: bool) -> str:
    if should_randomly_fail:
        random_num = random.randint(0, 5000000)
        print(f"Random number: {random_num}")
        if random_num < 1:
            print("Failing ......")
            raise ValueError("Randomly failing")

    print(f"Letter is: {letter}")
    return letter * 4


def execute_bigger_job(is_local: bool, number_of_records: int):
    spark = create_spark_session(is_local=is_local)

    raw_data = [(i, get_alphabet_skewed(i % 200)) for i in range(0, number_of_records)]
    data_df = spark.createDataFrame(raw_data, ["number", "letter"])\
        .repartition(201)

    print(data_df.count())
    data_df.show(truncate=False)

    data_df\
        .withColumn("repeated_alpha", repeat_letter(col("letter"), lit(False)))\
        .groupBy("repeated_alpha")\
        .agg(count(lit(1)))\
        .sort("repeated_alpha")\
        .show(truncate=False, n=30)


if __name__ == '__main__':
    execute_bigger_job(is_local=False, number_of_records=10000000)

    # For UI to stick
    time.sleep(1000000) 

Этот код можно получить из этого репозитория GitHib.

Обратите внимание, что в create_spark_session при подготовке сессии к запуску на кластере отсутствует конфигурация количества ядер. Это связано с тем, что данные конфигурации должны быть взяты из окружения по умолчанию или могут быть предоставлены командой spark-submit при отправке задания. Мы увидим это в ближайшее время.

Чтобы отправить задание, сначала зайдите по ssh на главный узел работающего кластера:

docker exec -it docker-spark-cluster-spark-master-1 /bin/bash

Далее папка /path_to_repository/apps в репозитории была отображена на /opt/spark-apps в мастер-узле. Скопируйте файл bigger_job.py в эту папку и убедитесь, что к нему можно получить доступ на мастер ноде:

ls -lh /opt/spark-apps/
Python-файл задания, доступный на главном узле кластера Spark
Python-файл задания, доступный на главном узле кластера Spark

Теперь выполним более крупное задание с помощью следующей команды spark-submit:

/opt/spark/bin/spark-submit --deploy-mode client --master spark://c44e0ffa8c82:7077 --conf "spark.sql.shuffle.partitions=201" /opt/spark-apps/bigger_job.py

Затем проверьте логи. После запуска Spark UI доступ можно получить по адресу localhost:4040. В веб-интерфейсе Spark Master (localhost:9000) задание отображается как выполняющееся:

Выполняемое задание видно в Web-интерфейсе Spark Master
Выполняемое задание видно в Web-интерфейсе Spark Master

И его можно увидеть работающим в пользовательском интерфейсе Spark:

Выполняемое задание в пользовательском интерфейсе Spark
Выполняемое задание в пользовательском интерфейсе Spark

Работа завершается через несколько минут:

Конечный результат работы
Конечный результат работы

Доступ к логам исполнителя и задания

После завершения работы мы знаем, что функция, определяемая пользователем, выводится на консоль, но в этом случае она не видна на консоли. Это связано с тем, что в отличие от локального режима, исполнители находятся на разных экземплярах, и поэтому доступ к их выводам можно получить из stderr и stdout соответствующих экземпляров и соответствующих задач/исполнителей.

Доступ к заданию 6 осуществляется на вкладке Jobs пользовательского интерфейса Spark:

Job View в Spark UI
Job View в Spark UI

Далее выберите единственный этап, чтобы просмотреть его детали:

Подробности о Job 6  в Spark UI
Подробности о Job 6 в Spark UI
Резюме Job 6 в Spark UI с разбивкой по задачам
Резюме Job 6 в Spark UI с разбивкой по задачам

Каждая задача имеет ссылки на stderr и stdout в колонке Logs. Выберите произвольную задачу и выберите stderr, вы заметите, что доступ к нему невозможен:

Ошибка при доступе к журналам на рабочем узле
Ошибка при доступе к журналам на рабочем узле

Помните, что ранее мы определили, что 172.20.0.4 является рабочим 2, а порт 9000 рабочего 2 был отображен на 9002. Поэтому для доступа к журналам измените часть 172.20.0.4:9000 на localhost:9002, сохранив остальные значения:

Журналы задач на рабочем узле
Журналы задач на рабочем узле

Теперь можно получить доступ к журналам. Эту процедуру можно повторить для всех задач.

Чтобы получить доступ к журналам исполнителей, перейдите на страницу исполнителей, там есть ссылка на stderr и stdout для каждого исполнителя:

Статистика исполнителя в Spark UI
Статистика исполнителя в Spark UI

С их помощью можно получить доступ и к журналам исполнителя.

Изучение журналов неудачных заданий

В UDF-функции repeat_letter есть параметр should_randomly_fail. Если установить его в значение True, то задание будет провалено с очень низкой вероятностью 1 к 5000000. Поэтому установите его в true там, где он вызывается:

def execute_bigger_job(is_local: bool, number_of_records: int):
    spark = create_spark_session(is_local=is_local)

    raw_data = [(i, get_alphabet_skewed(i % 200)) for i in range(0, number_of_records)]
    data_df = spark.createDataFrame(raw_data, ["number", "letter"])\
        .repartition(201)

    print(data_df.count())
    data_df.show(truncate=False)

    data_df\
        .withColumn("repeated_alpha", repeat_letter(col("letter"), lit(True)))\
        .groupBy("repeated_alpha")\
        .agg(count(lit(1)))\
        .sort("repeated_alpha")\
        .show(truncate=False, n=30)

Кроме того, запустим задание немного по-другому, со следующими свойствами исполнителя:

Обновленные параметры
Обновленные параметры

Этого можно добиться, передав в вызов spark-submit следующие конфигурации:

/opt/spark/bin/spark-submit --deploy-mode client --master spark://c44e0ffa8c82:7077 --conf "spark.executor.cores=1" --conf "spark.executor.memory=820M" --conf "spark.task.maxFailures=1000" /opt/spark-apps/bigger_job.py

Кроме того, поскольку у нас есть 2 рабочих узла, каждый из которых имеет 2 ядра и 2 ГБ оперативной памяти, то в такой конфигурации задание будет выполняться 4 исполнителями.

Остановим предыдущее задание и выполним обновленное. Перед запуском скопируйте файл bigger_job.py в папку apps в репозитории spark on docker (/path_to_repository/apps).

Откройте среду, чтобы убедиться, что переданные параметры настроены для выполнения задания:

Параметры, переданные для задания, были успешно сконфигурированы и видны в представлении Environment пользовательского интерфейса Spark UI
Параметры, переданные для задания, были успешно сконфигурированы и видны в представлении Environment пользовательского интерфейса Spark UI

Кроме того, на вкладке Executors теперь отображается 4 исполнителя, по два на каждом рабочем узле:

Далее, в журнале заданий мы видим, что произошло 5 сбоев. В целом задание не потерпело неудачу, так как число неудач меньше 1000, т.е. числа неудач, после которых Spark выводит из строя все задание:

5 неудачных заданий в задании 6
5 неудачных заданий в задании 6
Изучение исключения в одной из неудачных задач
Изучение исключения в одной из неудачных задач

Исключение мы можем увидеть в колонке Errors.

Определение перекоса данных (Data Skew)

Теперь создадим перекос данных, а также отключим случайный сбой задачи:

def execute_bigger_job(is_local: bool, number_of_records: int):
    spark = create_spark_session(is_local=is_local)

    raw_data = [(i, get_alphabet_skewed(i % 2000)) for i in range(0, number_of_records)]
    data_df = spark.createDataFrame(raw_data, ["number", "letter"])\
        .repartition(201)

    print(data_df.count())
    data_df.show(truncate=False)

    data_df\
        .withColumn("repeated_alpha", repeat_letter(col("letter"), lit(False)))\
        .groupBy("repeated_alpha")\
        .agg(count(lit(1)))\
        .sort("repeated_alpha")\
        .show(truncate=False, n=30)

А после завершения работы рассмотрим выполнение задачи Job 6:

Некоторые задания выполняются дольше из-за перекоса данных
Некоторые задания выполняются дольше из-за перекоса данных

В некоторых заданиях наблюдается перекос, несмотря на то, что задание выполняется в Spark 3.4.0 с включенной функцией Adaptive Query Execution. Это незначительное явление, но оно присутствует. Вот как можно использовать Spark UI для выявления перекоса данных.

Итоги

  • Spark UI — это веб-интерфейс, обеспечивающий детальное представление приложений Spark, заданий и планов запросов.

  • В нем перечислены все выполненные или выполняемые задания, а также предоставлен доступ к их метрикам и журналам.

  • Доступ к Spark UI запущенного задания осуществляется по адресу localhost:4040, если он не настроен иначе.

  • Информация, предоставляемая Spark UI, может быть использована для анализа работы задания и выявления необходимых исправлений.

  • Spark UI также может быть использован для изучения журналов узлов и заданий неудачных заданий/задач с целью выявления причины неудачи.

В этой статье мы познакомились с интерфейсом Spark UI и некоторыми его возможностями. Если вы изучаете Spark, приходите в Слёрм на курс Spark-инженер

После него вы сможете:

  • Решать задачи DE с помощью Python/Scala, SQL. Использовать разные базы данных и автоматизировать рутину.

  • Собирать Spark-приложение, делать Deployment.

  • Строить CI/CD для Spark-приложений. Работать с облачной инфраструктурой.

  • Оркестрировать и мониторить Spark-приложение с помощью Airflow.

  • Обрабатывать терабайты данных разных форматов: таблицы, потоки, геоданные, графы.

  • Настраивать пакетную (batch) и потоковую (streaming) обработки данных.

  • Презентовать данные заказчику и говорить с бизнесом на одном языке.

???? Подробности на нашем сайте.

Комментарии (0)