Spark Connect
Spark Connect — это, пожалуй, самая ожидаемая фича Apache Spark 3.4.0. Она была анонсирована на конференции Data+AI Summit 2022 и сейчас широко освещается в социальных сетях. Вот и я решил приобщиться и внести свой небольшой вклад в это дело, продемонстрировав вам несколько интересных деталей реализации.
Зачем это нужно?
Как обычно, хорошо бы начать с вопроса “а зачем это вообще нам нужно?”. Мартин Грунд (Martin Grund), возглавляющий проект и определивший его первоначальный дизайн, приводит некоторые причины в SPARK-39375:
Стабильность: в кластерах с общим доступом OOM одного пользователя может вывести из строя весь кластер.
Обновляемость: зависимости в classpath создают достаточно неприятные проблемы, такие как ад зависимостей.
Developer experience: подключение вашего локального кода к кластерам уже давно перешло в разряд насущных потребностей. Несмотря на множество плагинов, стандартного способа сделать это не существует.
Встроенное удаленное подключение: требуются сторонние инструменты, например Apache Livy.
Честно говоря, самым интересным пунктом в этом списке для меня является developer experience. Запускать задачу с локальной машины, в перспективе с дополнительной отладочной информацией, без необходимости устанавливать сторонние плагины или специально настраивать для этого IDE — это просто замечательно. В одной из своих недавних презентаций Мартин привел довольно убедительный пример того, насколько Spark Connect может упростить нам жизнь:
За этой простотой все-таки скрывается некоторая сложность. Но, к счастью, вы, как конечный пользователь, не по большей части будете от нее ограждены!
Высокоуровневая архитектура
В общем виде Spark Connect представляет собой клиент-серверную архитектуру с взаимодействием на основе gRPC:
Выглядит просто, не правда ли? Если вы настроены использовать Spark Connect, то этой картинки в принципе должно быть достаточно. Но если вы хотите узнать больше, давайте увеличим схему и посмотрим на некоторые детали:
Низкоуровневые детали
Должен признать, что схема достаточно детальная, но я не смог больше ничего из нее вынести, чтобы не потерять возможность наглядно продемонстрировать реализацию Spark Connect. Надеюсь, с помощью следующих подробностей вы сможете лучше понять, как Spark Connect устроен под капотом!
Для начала, когда вы создаете Spark Connect SparkSession, вы вызываете метод SparkSession.builder.remote("...")
. Внутри он преобразуется в SparkSession, расположенный в модуле pyspark.sql.connect
. Эта выделенная для Spark Connect сессия имеет общий API-контракт с SparkSession, но предоставляет немного другую реализацию. Вместо того чтобы взаимодействовать с физическими узлами Spark напрямую, она делает это посредством gRPC-вызовов.
В нашем примере, когда мы вызываем метод collect()
, мы фактически выполняем gRPC-вызов типа execute plan
. Здесь стоит отметить, что план является логическим и локальным, то есть он не имеет никакого отношения к плану Catalyst. Он является частью запроса с полезной нагрузкой на основе Protobuf, и сервер Spark Connect знает, как с ним работать.
После перехвата первоначального запроса в конечной точке SparkConnectService#executePlan
начинается этап обработки. После этого метод handlePlan()
вызывает функцию processAsArrowBatch
:
processAsArrowBatches(
sessionId: String,
dataframe: DataFrame,
responseObserver: StreamObserver[ExecutePlanResponse]): Unit = {
val spark = dataframe.sparkSession
// ...
SQLExecution.withNewExecutionId(dataframe.queryExecution, Some("collectArrow")) {
val rows = dataframe.queryExecution.executedPlan.execute()
val numPartitions = rows.getNumPartitions
var numSent = 0
if (numPartitions > 0) {
type Batch = (Array[Byte], Long)
val batches = rows.mapPartitionsInternal(
SparkConnectStreamHandler
.rowToArrowConverter(schema, maxRecordsPerBatch, maxBatchSize, timeZoneId))
// ...
partition.foreach { case (bytes, count) =>
val response = proto.ExecutePlanResponse.newBuilder().setSessionId(sessionId)
val batch = proto.ExecutePlanResponse.ArrowBatch
.newBuilder()
.setRowCount(count)
.setData(ByteString.copyFrom(bytes))
.build()
response.setArrowBatch(batch)
responseObserver.onNext(response.build())
numSent += 1
}
currentPartitionId += 1
}
}
Как видно из этого фрагмента, есть еще одна (на этот раз физическая) сессия SparkSession, которая после преобразования логического плана Spark Connect в физическое представление работает непосредственно с данными и возвращает их клиенту в виде пакетов (батчей).
Расширяемость
Кроме того, Spark Connect является расширяемым. Вы можете реализовать собственные расширения для отношений, команд и выражений в составе библиотек Spark Server. Все, что вам нужно сделать в этом случае, это:
Определите плагин. Он должен реализовать один из этих интерфейсов:
CommandPlugin
,ExpressionPlugin
илиRelationPlugin
. Плагин преобразует логический план Protobuf, полученный от клиента, в его эквивалент в Catalyst.
Зарегистрируйте плагины команд, выражений и отношений в конфигурационных записях spark.connect.extensions.command.classes
, spark.connect.extensions.expression.classes
и spark.connect.extensions.relation.classes
соответственно. Таким образом, зарегистрированные классы будут загружены SparkConnectPluginRegistry
и преобразованы на этапе преобразования плана из Protobuf в Catalyst. Ниже приведен пример для отношений:
private def transformRelationPlugin(extension: ProtoAny): LogicalPlan = {
SparkConnectPluginRegistry.relationRegistry
// Лениво перебираем коллекцию.
.view
// Применяем трансформацию.
.map(p => p.transform(extension, this))
// Находим первую непустую трансформацию или вызываем throw.
.find(_.nonEmpty)
.flatten
.getOrElse(throw InvalidPlanInput("No handler found for extension"))
}
Определите клиентскую часть. Она должна быть способна кодировать сообщение Protobuf, используемое для описания плагина на стороне сервера.
Резюме из презентации Мартина, упоминавшейся ранее:
Для полноты картины — еще один материал из выступления Мартина, демонстрирующий всю мощь реализации Spark Connect на базе Protobuf:
Вы можете использовать его [плагин] на стороне клиента на любом языке. И это еще один ключевой момент. Если вы создадите свою серверную библиотеку, вы сможете использовать ее из Rust, из Go, откуда угодно, если только вы знаете, как закодировать в ней сообщение Protobuf.
Пок это все, что касается Spark Connect в контексте Apache Spark 3.4.0, но это только начало истории Apache Spark. Недавняя активность в подкаталоге на Github дает надежду на то, что Spark Connect может стать постоянной рубрикой в моей серии "Что нового в Apache Spark"!
— Доработки для Shuffle
Shuffle (перетасовка) — постоянно повторяющаяся тема в серии "Что нового в Apache Spark". Почему? Зачастую это одна из самых трудоемких частей задач, а знание об улучшениях в этом процессе попросту помогает писать лучшие конвейеры.
Хранилище состояний shuffle-сервиса RocksDB
Автором первой фичи, связанной с shuffle, является Ян Цзе (Yang Jie). Он расширил хранилище состояний (state store) shuffle-сервиса реализацией для RocksDB. Но прежде всего, зачем здесь RocksDB? Shuffle-сервис представляет собой прокси-сервер на каждом исполнителе в кластере. При чтении shuffle-данных исполнители общаются с прокси, а не напрямую между собой. Вы можете подумать, что в этой конфигурации shuffle-блоки хранятся в RocksDB, но это не так. Сервис отслеживает только файлы, поэтому ему необходимо поддерживать состояние управляемых исполнителей. А если точнее, то вот это:
public class ExecutorShuffleInfo implements Encodable {
/** Базовый набор локальных директорий, в которых исполнитель хранит свои shuffle-файлы. */
public final String[] localDirs;
/** Number of subdirectories created within each localDir. *//** Количество подкаталогов, созданных в каждом localDir. */
public final int subDirsPerLocalDir;
/**
* Shuffle-менеджер (SortShuffleManager), который использует исполнитель.
* Если эта строка содержит точку с запятой, она также будет включать метаинформацию
* для push-based shuffle в формате JSON. Пример строки с точкой с запятой:
* SortShuffleManager:{"mergeDir": "mergeDirectory_1", "attemptId": 1}
*/
public final String shuffleManager;
// ...
}
Проблема с внешним shuffle-сервисом, как и с любым другим, заключается в риске сбоя. Исторически эта проблема была решена в ExternalShuffleService, который должен быть устойчив к перезапускам NodeManager в yarn, в релизе 1.6.0. Пул-реквест, предложенный Имраном Рашидом (Imran Rashid) для привязанной JIRA, очень хорошо описывает назначение хранилища состояний shuffle-сервиса:
В целом, Yarn-приложения должны быть устойчивы к перезапускам NodeManager. Однако если запустить spark с включенным внешним shuffle-сервисом, то после перезапуска NodeManager все перетасовки не работают, поскольку shuffle-сервис потерял часть состояния с информацией о каждом исполнителе. (Обратите внимание, что shuffle-данные прекрасно сохраняются на диске после перезапуска NodeManager, проблема в том, что мы потеряли небольшой фрагмент состояния, позволяющий нам найти эти файлы). Решение, предлагаемое здесь, заключается в том, что внешний shuffle-сервис может записывать свое состояние в файл каждый раз, когда добавляется исполнитель. При работе с yarn этот файл находится в локальном каталоге NodeManager. При каждом запуске сервиса он ищет этот файл, и если он существует, то считывает его и перерегистрирует в нём всех исполнителей.
Каждый раз, когда запускается новый исполнитель, он регистрируется во внешнем shuffle-сервисе здесь:
public class ExternalShuffleBlockResolver {
public void registerExecutor(
String appId, String execId, ExecutorShuffleInfo executorInfo) {
AppExecId fullId = new AppExecId(appId, execId);
logger.info("Registered executor {} with {}", fullId, executorInfo);
try {
if (db != null) {
byte[] key = dbAppExecKey(fullId);
byte[] value = mapper.writeValueAsString(executorInfo).getBytes(StandardCharsets.UTF_8);
db.put(key, value);
}
} catch (Exception e) {
logger.error("Error saving registered executors", e);
}
executors.put(fullId, executorInfo);
}
До выхода версии 3.4.0 Apache Spark поддерживал в качестве бэкенда "db" только LevelDB. Теперь вы можете настраивать это в spark.shuffle.service.db.backend
и использовать RocksDB.
В этой связи стоит упомянуть о проблемах, с которыми вы можете столкнуться при работе компонентов на базе LevelDB на Apple Silicon, и внешнее хранилище состояний shuffle-сервиса — одна из них.
Push-based shuffle
В этом разделе мы начнем с одного исправления, предложенного Ваном Куном (Wan Kun), для удаления файлов данных слияния (merge) для shuffle на основе push-модели. Начиная с этого момента драйвер отправляет сообщение RemoveShuffleMerge
во все места слияния, что приводит к удалению всех связанных с ними файлов данных слияния.
Второе исправление, на этот раз реализованное gaoyajun02, решает проблему чтения блоков нулевого размера, которые могут поступать от узлов с аппаратными проблемами. В этом случае, если это возможно, программа поиска shuffle-блоков будет откатываться к получению оригинальных shuffle-блоков.
Кроме того, есть и улучшения, связанные с отказоустойчивостью. Йе Чжоу (Ye Zhou) реализовала возможность для push-based shuffle-сервиса хранить свое состояние в инстансе LevelDB. Это позволяет избежать потери состояния в случае перезапуска NodeManager, что могло бы привести к невозможности обслуживать запросы на выборку объединенных данных.
Наконец, были внесены некоторые изменения, связанные с метриками. Тейдип Гудивада (Thejdeep Gudivada) добавил метрики чтения на стороне клиента. Благодаря им вы можете узнать о таких вещах, как количество поврежденных объединенных блоков, количество возвратов к нормальному чтению блоков и многое другое.
Я прекрасно понимаю, что эта статья в блоге гораздо более низкоуровневая, чем предыдущие из этой серии. Тем не менее, она служит хорошим напоминанием о дополнительных компонентах shuffle, включая shuffle-сервис и push-based shuffle, добавленных не так давно (в Apache Spark 3.2.0).
Как успешно пройти интервью на позицию Data Engineer с уклоном на Spark?
Разберемся на завтрашнем открытом уроке. Преподаватель OTUS также поделится практическими кейсами и ответит на вопросы участников. Записаться на урок можно на странице курса "Spark Developer".