Публикуем перевод гайда по Spark UI. Это встроенный инструмент Apache Spark, который предоставляет полный обзор среды Spark: узлов, исполнителей, свойств и параметров среды, выполняемых заданий, планов запросов и многого другого. Кроме теории в статье вы найдёте несколько примеров, которые помогут попрактиковаться в отслеживании и анализе заданий Spark.
Первый взгляд на Spark UI
По умолчанию доступ к интерфейсу Spark UI осуществляется в режиме разработки по адресу localhost:4040
. Давайте быстро начнем новый проект, запустим сессию Spark, выполним простое задание и посмотрим на интерфейс Spark UI:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, collect_list
import time
def create_spark_session() -> SparkSession:
conf = SparkConf().set("spark.driver.memory", "8g")
spark_session = SparkSession\
.builder\
.master("local[4]")\
.config(conf=conf)\
.appName("Spark UI Tutorial") \
.getOrCreate()
return spark_session
if __name__ == '__main__':
spark = create_spark_session()
test_df = spark.createDataFrame([
(1, 'a'),
(2, 'b'),
(3, 'c'),
(4, 'd'),
(5, 'e'),
(6, 'f'),
(7, 'g'),
(8, 'h'),
(9, 'i'),
(10, 'j')
], ["number", "letter"]).cache()
test_df.show(truncate=False)
test_df \
.withColumn("mod", col("number") % 2) \
.groupBy("mod") \
.agg(collect_list("letter").alias("letter")) \
.show(truncate=False)
# For UI to stick
time.sleep(1000000)
Я запускаю эту программу в PyCharm IDE. Я добавил файл requirements.txt
, в котором также имеется только одна зависимость:
pyspark==3.4.0
Spark Session создаст сессию Spark в локальном режиме, а конфигурация local[4]
означает, что будут использоваться 4 ядра. В локальном режиме задачи выполняются на driver node.
Выполнение этого кода приведет к созданию простого фрейма данных в памяти, а затем к выполнению над ним операции group by перед выводом на экран. Созданный фрейм данных также кэшируется. После выполнения этого простого задания функция time.sleep(100000)
сохраняет сессию Spark в течение длительного времени, чтобы можно было изучить пользовательский интерфейс Spark.
Чтобы получить доступ к Spark UI, во время выполнения кода откройте браузер и перейдите на localhost:4040
, который является портом по умолчанию для Spark UI. Рассмотрим пользовательский интерфейс Spark подробнее.
Jobs View
Примечание: Если порт 4040 занят, в журнале выполнения задания будет указано, что оно запускается с порта 4041 (или любого другого порта, доступного после 4040), поэтому откройте его.
При открытии пользовательского интерфейса Spark UI отображаются все задания, как завершенные, так и находящиеся в процессе выполнения. Время выполнения нашего простого задания очень мало — всего 10 секунд, поэтому видны только завершенные задания и их этапы. Если бы были более длительные задания, то Spark UI показывал бы и их. Для каждого задания мы можем видеть количество этапов и время, которое оно заняло. Кроме того, в пользовательском интерфейсе отображается количество заданий: успешных, неудачных (если таковые имеются) и незавершенных (если таковые имеются).
Временную шкалу событий можно просмотреть, развернув заголовок Event Timeline heading (Временная шкала событий):
Временная шкала событий отображает в хронологическом порядке события, связанные с исполнителями (добавленные, удаленные) и заданиями. Подрбнее можно почитать в документации.
Так выглядит пользовательский интерфейс Spark в режиме разработки, однако в режиме кластера/клиента на Home page перечислены все отправленные задания. Если выбрать одно из них, то откроется подробное представление.
Stages View
Щелкните на Stages в верхнем меню, чтобы открыть страницу stages:
В режиме просмотра этапов отображается дополнительная информация о размере входных данных для каждого этапа, размере данных для чтения и записи в шаффле, количестве задач. Кроме того, указываются пропущенные этапы. Выберите этап 2 для отображения подробной информации об этом этапе.
Сверху находится заголовок DAG Visualization, щелкнув по которому можно раскрыть DAG-визуализацию этапа:
Визуализация DAG показывает разбивку каждого этапа на шаги. Под визуализацией DAG находится график событий (Event Timeline), который показывает время, затраченное на выполнение каждой задачи, или время, затраченное на выполнение текущих задач. График Event Timeline также свернут по умолчанию, и его необходимо развернуть.
График временной шкалы событий имеет общую временную шкалу на оси x, которая дает общее представление о времени начала, окончания и продолжительности выполнения всех задач в сравнении друг с другом.
Если на этапе имеется больше задач, чем количество задач, которые могут быть выполнены одновременно, то некоторые задачи могут быть запущены после освобождения мощностей. Все это можно проследить на временной шкале. Временная шкала каждой задачи разбивается на время вычислений и время, затрачиваемое на выполнение других административных задач, таких как сериализация/де-сериализация, чтение/запись с перестановкой и т.д.
Вот как выглядит временная шкала для более крупного задания с большим количеством задач:
Под временной шкалой находится сводная статистика, показывающая длительность (Duration), время уборки мусора (Garbage Collection time), время ввода и перетасовки (input and shuffle times), а также некоторые другие моменты времени:
Разбивка каждого временного интервала производится по минимальному, максимальному и 25-му, среднему и 75-му процентилям. Эта таблица очень удобна для выявления несовпадающих задач и перекоса данных. В идеале все длительности и объемы данных должны быть близки к равномерным для всех задач, т.е. как можно ближе к медиане.
Несколько задач, занимающих очень много времени, могут свидетельствовать о перекосе данных или о том, что обработка занимает много времени по другой причине, например, из-за сложности вычислений.
Далее приводится таблица с подробным описанием каждой задачи:
Здесь можно получить такую информацию, как время выполнения каждой задачи, на каком узле она выполнялась, время GC, размер входных данных и размер шаффла. Также, если есть возможность, приведены ссылки на консольные выходы каждой задачи, как stderr
, так и stdout
, которые можно использовать для оценки результатов.
SQL/DataFrame View
Доступ к представлению SQL/DataFrame можно получить, выбрав кнопку SQL/DataFrame на верхней панели. Она является последней в верхнем меню.
В main view отображается каждый запрос в общем задании, его длительность, а также идентификаторы заданий (Job IDs), связанные с каждым запросом. Для начала выделите запрос с идентификатором 0, чтобы получить подробную информацию о нем:
Это представление SQL-запроса для первого действия:
Ознакомимся с шагами.
Во-первых, есть шаг Scan Existing RDD, который создает DataFrame
путем чтения из списка значений. Количество выводимых строк равно 10. Далее DataFrame
кэшируется и считывается из памяти шагом in-memory Table Scan. Но поскольку это первое действие, то перед кэшированием DataFrame
должен быть построен/считан, и поэтому первый шаг выполняется, а не пропускается. Далее следуют шаги проецирования и сбора (Project и Collect Limit), которые выбирают и отображают DataFrame
.
Раскройте заголовок Details
, чтобы увидеть план запроса. Physical plan имеет следующий вид:
== Physical Plan ==
CollectLimit (5)
+- * Project (4)
+- InMemoryTableScan (1)
+- InMemoryRelation (2)
+- * Scan ExistingRDD (3)
Затем откройте второй SQL-запрос с предыдущей страницы:
Это более длинный запрос, поскольку для получения конечного результата требуется вычислить производный столбец, сгруппировать по нему данные, а затем агрегировать столбец "letter".
Но главное, на чем я хочу заострить внимание, находится в самом начале запроса. Шаг Scan Existing RDD не возвращает ни одной строки, и это потому, что данные для этого шага могут быть повторно использованы из кэша, а кэш был заполнен в предыдущем запросе.
Но поскольку операция кэширования не обрезает строку данных, и возможно (из-за потери данных или удаления узла в публичной облачной среде), некоторые части кэшированных данных придется перестраивать, и если это произойдет, то это также будет видно в плане запроса.
Однако если вместо кэша используется контрольная точка (checkpoint), то добавляется еще один SQL-план, поскольку Checkpoint — это действие:
План запросов задачи контрольной точки прост:
Обе последующие задачи теперь читают данные из контрольной точки, а не из источника:
Если данные контрольной точки будут потеряны, то восстановить их будет невозможно, так как линия данных будет обрезана, и задание завершится неудачно.
Storage View
Говоря о кэше, следует отметить, что в Storage view (выберите Storage в верхнем меню) хранятся все сохраняемые данные. В данном случае это DataFrame
, который был кэширован:
Кроме того, отображается размер сохраняемых данных в памяти и на диске. По каждому сохраненному элементу можно щелкнуть мышью, чтобы открыть подробности:
Environment view
Наконец, в представлении среды отображается сконфигурированная среда:
Executors view
В представлении "Executors" отображаются все исполнители, а также сводная разбивка. В этом представлен: количество активных, завершенных и неудачных задач для каждого экземпляра исполнителя, а также время сборки мусора (GC), размеры входных и перемешиваемых данных и, при наличии, журналы каждого исполнителя. Доступ к нему можно получить, выбрав кнопку Executors на верхней панели навигации:
Практика на двухузловом кластере, запущенном на docker
Теперь рассмотрим более близкую к реальности ситуацию: многоузловой кластер и более длительное и сложное задание.
Для запуска двухузлового кластера Spark с помощью Docker Compose я нашел отличный источник. В этой статье подробно рассматривается создание многоузлового кластера Spark с помощью Docker и Docker Compose. Здесь находится соответствующий репозиторий кода.
Однако я форкнул это репо и внес следующие изменения:
Обновлена версия Spark до 3.4.0 (статья написана до выхода 3.5).
Удален последний контейнер базы данных.
Изменен порт веб-интерфейса Spark на 9000.
Открыт порт 4040 на мастере.
Некоторые незначительные изменения портов и другие изменения.
Увеличена память рабочего процессора до 2 ГБ и рабочие ядра до 2.
Обновленный репозиторий, на который мы будем ссылаться в этой статье, можно получить по этой ссылке.
Кластер можно собрать, сначала создав файл docker, а затем запустив docker compose up
:
docker build -t spark-docker-image-3.4.0 .
docker compose up
При этом создается кластер с 1 ведущим и 2 рабочими узлами:
Доступ к веб-интерфейсу Spark Master можно получить через порт 9000. Это не пользовательский интерфейс Spark для задания, а список всех заданий, которые были выполнены или находятся в процессе выполнения. Доступ к Spark UI выполняющегося задания можно получить по адресу localhost:4040
или по ссылке из этого списка.
Обратите внимание на IP-адреса рабочих узлов. На моем компьютере рабочий 1 — это 172.20.0.3
, а рабочий 2 — 172.20.0.4
. Это необходимо для доступа к журналам исполнителей и задач на рабочих узлах.
Кроме того, Spark master имеет адрес spark://c44e0ffa8c82:7077
, который будет передан команде spark-submit.
Подготовка задания к выполнению и демонстрации
Задание для демонстрации выполняет следующие действия:
Создает
DataFrame
с двумя столбцами: c цифрами и буквами.Каждое число сопоставляется с буквой, и это сопоставление может быть изменено для получения перекоса данных. Чем больше число, передаваемое в функцию
get_alphabet_skewed
, тем больше перекос букв x, y и z.Функция, определяемая пользователем (UDF), которая повторяет переданную букву 4 раза. Она также может не сработать (один раз из 500 000 раз), если параметр имеет значение
True
.
Код задания приведен ниже:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, lit, udf
import time, random
from pyspark.sql.types import StringType
def create_spark_session(is_local: bool) -> SparkSession:
if is_local:
print("Stating for local mode")
conf = SparkConf().set("spark.driver.memory", "8g")
spark_session = SparkSession\
.builder\
.master("local[4]")\
.config(conf=conf)\
.appName("Spark UI Tutorial") \
.getOrCreate()
spark_session.sparkContext.setCheckpointDir("checkpoint")
return spark_session
else:
print("Stating for cluster mode")
return SparkSession\
.builder\
.appName("Spark UI Tutorial")\
.getOrCreate()
def get_alphabet_skewed(number: int):
if number > 25:
if number % 3 == 0:
return "x"
if number % 3 == 1:
return "y"
else:
return "z"
else:
return chr(97 + number)
@udf(returnType=StringType())
def repeat_letter(letter: str, should_randomly_fail: bool) -> str:
if should_randomly_fail:
random_num = random.randint(0, 5000000)
print(f"Random number: {random_num}")
if random_num < 1:
print("Failing ......")
raise ValueError("Randomly failing")
print(f"Letter is: {letter}")
return letter * 4
def execute_bigger_job(is_local: bool, number_of_records: int):
spark = create_spark_session(is_local=is_local)
raw_data = [(i, get_alphabet_skewed(i % 200)) for i in range(0, number_of_records)]
data_df = spark.createDataFrame(raw_data, ["number", "letter"])\
.repartition(201)
print(data_df.count())
data_df.show(truncate=False)
data_df\
.withColumn("repeated_alpha", repeat_letter(col("letter"), lit(False)))\
.groupBy("repeated_alpha")\
.agg(count(lit(1)))\
.sort("repeated_alpha")\
.show(truncate=False, n=30)
if __name__ == '__main__':
execute_bigger_job(is_local=False, number_of_records=10000000)
# For UI to stick
time.sleep(1000000)
Этот код можно получить из этого репозитория GitHib.
Обратите внимание, что в create_spark_session
при подготовке сессии к запуску на кластере отсутствует конфигурация количества ядер. Это связано с тем, что данные конфигурации должны быть взяты из окружения по умолчанию или могут быть предоставлены командой spark-submit
при отправке задания. Мы увидим это в ближайшее время.
Чтобы отправить задание, сначала зайдите по ssh на главный узел работающего кластера:
docker exec -it docker-spark-cluster-spark-master-1 /bin/bash
Далее папка /path_to_repository/apps
в репозитории была отображена на /opt/spark-apps
в мастер-узле. Скопируйте файл bigger_job.py
в эту папку и убедитесь, что к нему можно получить доступ на мастер ноде:
ls -lh /opt/spark-apps/
Теперь выполним более крупное задание с помощью следующей команды spark-submit
:
/opt/spark/bin/spark-submit --deploy-mode client --master spark://c44e0ffa8c82:7077 --conf "spark.sql.shuffle.partitions=201" /opt/spark-apps/bigger_job.py
Затем проверьте логи. После запуска Spark UI доступ можно получить по адресу localhost:4040
. В веб-интерфейсе Spark Master (localhost:9000
) задание отображается как выполняющееся:
И его можно увидеть работающим в пользовательском интерфейсе Spark:
Работа завершается через несколько минут:
Доступ к логам исполнителя и задания
После завершения работы мы знаем, что функция, определяемая пользователем, выводится на консоль, но в этом случае она не видна на консоли. Это связано с тем, что в отличие от локального режима, исполнители находятся на разных экземплярах, и поэтому доступ к их выводам можно получить из stderr
и stdout
соответствующих экземпляров и соответствующих задач/исполнителей.
Доступ к заданию 6 осуществляется на вкладке Jobs пользовательского интерфейса Spark:
Далее выберите единственный этап, чтобы просмотреть его детали:
Каждая задача имеет ссылки на stderr
и stdout
в колонке Logs. Выберите произвольную задачу и выберите stderr
, вы заметите, что доступ к нему невозможен:
Помните, что ранее мы определили, что 172.20.0.4
является рабочим 2, а порт 9000 рабочего 2 был отображен на 9002. Поэтому для доступа к журналам измените часть 172.20.0.4:9000
на localhost:9002
, сохранив остальные значения:
Теперь можно получить доступ к журналам. Эту процедуру можно повторить для всех задач.
Чтобы получить доступ к журналам исполнителей, перейдите на страницу исполнителей, там есть ссылка на stderr
и stdout
для каждого исполнителя:
С их помощью можно получить доступ и к журналам исполнителя.
Изучение журналов неудачных заданий
В UDF-функции repeat_letter
есть параметр should_randomly_fail
. Если установить его в значение True
, то задание будет провалено с очень низкой вероятностью 1 к 5000000. Поэтому установите его в true
там, где он вызывается:
def execute_bigger_job(is_local: bool, number_of_records: int):
spark = create_spark_session(is_local=is_local)
raw_data = [(i, get_alphabet_skewed(i % 200)) for i in range(0, number_of_records)]
data_df = spark.createDataFrame(raw_data, ["number", "letter"])\
.repartition(201)
print(data_df.count())
data_df.show(truncate=False)
data_df\
.withColumn("repeated_alpha", repeat_letter(col("letter"), lit(True)))\
.groupBy("repeated_alpha")\
.agg(count(lit(1)))\
.sort("repeated_alpha")\
.show(truncate=False, n=30)
Кроме того, запустим задание немного по-другому, со следующими свойствами исполнителя:
Этого можно добиться, передав в вызов spark-submit
следующие конфигурации:
/opt/spark/bin/spark-submit --deploy-mode client --master spark://c44e0ffa8c82:7077 --conf "spark.executor.cores=1" --conf "spark.executor.memory=820M" --conf "spark.task.maxFailures=1000" /opt/spark-apps/bigger_job.py
Кроме того, поскольку у нас есть 2 рабочих узла, каждый из которых имеет 2 ядра и 2 ГБ оперативной памяти, то в такой конфигурации задание будет выполняться 4 исполнителями.
Остановим предыдущее задание и выполним обновленное. Перед запуском скопируйте файл bigger_job.py
в папку apps
в репозитории spark on docker (/path_to_repository/apps
).
Откройте среду, чтобы убедиться, что переданные параметры настроены для выполнения задания:
Кроме того, на вкладке Executors теперь отображается 4 исполнителя, по два на каждом рабочем узле:
Далее, в журнале заданий мы видим, что произошло 5 сбоев. В целом задание не потерпело неудачу, так как число неудач меньше 1000, т.е. числа неудач, после которых Spark выводит из строя все задание:
Исключение мы можем увидеть в колонке Errors.
Определение перекоса данных (Data Skew)
Теперь создадим перекос данных, а также отключим случайный сбой задачи:
def execute_bigger_job(is_local: bool, number_of_records: int):
spark = create_spark_session(is_local=is_local)
raw_data = [(i, get_alphabet_skewed(i % 2000)) for i in range(0, number_of_records)]
data_df = spark.createDataFrame(raw_data, ["number", "letter"])\
.repartition(201)
print(data_df.count())
data_df.show(truncate=False)
data_df\
.withColumn("repeated_alpha", repeat_letter(col("letter"), lit(False)))\
.groupBy("repeated_alpha")\
.agg(count(lit(1)))\
.sort("repeated_alpha")\
.show(truncate=False, n=30)
А после завершения работы рассмотрим выполнение задачи Job 6:
В некоторых заданиях наблюдается перекос, несмотря на то, что задание выполняется в Spark 3.4.0 с включенной функцией Adaptive Query Execution. Это незначительное явление, но оно присутствует. Вот как можно использовать Spark UI для выявления перекоса данных.
Итоги
Spark UI — это веб-интерфейс, обеспечивающий детальное представление приложений Spark, заданий и планов запросов.
В нем перечислены все выполненные или выполняемые задания, а также предоставлен доступ к их метрикам и журналам.
Доступ к Spark UI запущенного задания осуществляется по адресу
localhost:4040
, если он не настроен иначе.Информация, предоставляемая Spark UI, может быть использована для анализа работы задания и выявления необходимых исправлений.
Spark UI также может быть использован для изучения журналов узлов и заданий неудачных заданий/задач с целью выявления причины неудачи.
В этой статье мы познакомились с интерфейсом Spark UI и некоторыми его возможностями. Если вы изучаете Spark, приходите в Слёрм на курс Spark-инженер.
После него вы сможете:
Решать задачи DE с помощью Python/Scala, SQL. Использовать разные базы данных и автоматизировать рутину.
Собирать Spark-приложение, делать Deployment.
Строить CI/CD для Spark-приложений. Работать с облачной инфраструктурой.
Оркестрировать и мониторить Spark-приложение с помощью Airflow.
Обрабатывать терабайты данных разных форматов: таблицы, потоки, геоданные, графы.
Настраивать пакетную (batch) и потоковую (streaming) обработки данных.
Презентовать данные заказчику и говорить с бизнесом на одном языке.