Сегодня все крупные компании сохраняют и обрабатывают большие объёмы информации, причём стремятся делать это максимально эффективным для бизнеса способом. Меня зовут Мазаев Роман и я работаю в проекте загрузки данных на платформу SberData. Мы используем PySpark, который позволяет очень быстро распределённо обрабатывать данные в оперативной памяти узлов нашего кластера на базе Hadoop. Я поделюсь способом, с помощью которого можно снизить потребление ресурсов кластера за счёт перезапуска PySpark-приложений между выполняемыми Spark-задачами, и расскажу, как это делать правильно.
Особенности PySpark
PySpark — это API фреймворка Apache Spark с открытым исходным кодом на языке Python. Сам же фреймворк Apache Spark написан на Scala и Java.
Для реализации API PySpark использует библиотеку PY4J, которая позволяет Python-приложению динамически получать доступ к Java-объектам внутри функционирующего JVM-процесса (Java Virtual Machine). Важно отметить, что общение между PVM- (Python Virtual Machine) и JVM-процессами в PY4J реализуется при помощи сокетов, а информация, которая кладётся процессами в сокет, должна соответствовать протоколу обмена информацией PY4J.
Главной особенностью PySpark является то, что он предоставляет возможность выполнять Spark-задачи в памяти JVM-процессов, используя код на Python. Для этого необходимо работать только со структурированными данными и использовать доступные в PySpark методы их обработки. Например, Spark SQL или Dataframe API. В этом можно убедиться, обратившись к фрагменту исходного PySpark, представленному на рисунке 2.
Здесь можно заметить, что при вызове метода union
для некоторого экземпляра dataframe
— происходит обращение к его закрытому атрибуту _jdf
, который является экземпляром класса JavaObject в PY4J и ссылается на оригинальный dataframe
на стороне JVM.
Стоит добавить, что обработка данных с помощью RDD (распределённого набора данных) API или собственных реализаций функций на Python приведёт к значительным потерям производительности. Это связано с перемещением данных между JVM- и Python daemon-процессами, а также с накладными расходами на их сериализацию и десериализацию.
Помимо прочего Apache Spark допускает два режима развёртывания — клиентский и кластерный, причём первый используется по умолчанию. В целом это влияет на то, где будет функционировать JVM-процесс драйвера (driver) и связанный с ним SparkContext.
Драйвер (driver) управляет всей информацией о Spark-приложении, исполняет указанные команды, распределяет задачи между исполнителями и, при необходимости, может сохранять в своей памяти некоторый извлечённый набор записей от всех исполнителей (например, с помощью метода collect
).
Исполнители (executors) — это отдельные JVM-процессы, которые выполняют указанные драйвером задачи и хранят в своей памяти обрабатываемые данные.
SparkContext — это создаваемый процессом драйвера объект, который представляет собой точку входа в Spark и реализует подключение к менеджеру кластера (standalone, Mesos, YARN, Kubernetes). В нём можно определить набор конфигураций, который будет использован в приложении, и с помощью него можно создавать RDD, широковещательные переменные и накопители. Это сердце Spark, которое существует в единственном экземпляре в программе драйвера.
Если в качестве менеджера кластера используется YARN, то для каждого Spark-приложения в кластере создаётся отдельный процесс ApplicationMaster, который согласовывает ресурсы с YARN-менеджером ресурсов и взаимодействует с YARN-менеджером узлов при выборе наиболее подходящих хостов для функционирования Spark-приложения.
Клиентский режим развёртывания
В клиентском режиме развёртывания PySpark-приложения JVM-процесс драйвера и соответствующий ему SparkContext
будет создан на той же машине, где функционирует основной Python-процесс. При этом Python-приложению будет предоставлен прямой доступ к данным, которые драйвер может извлекать в свою память. Вместе с этим PySpark-приложение будет жить до тех пор, пока функционирует основной Python-процесс, создавший его.
Кластерный режим развёртывания
При кластерном режиме развёртывания PySpark-приложения JVM-процесс драйвера и соответствующий ему SparkContext
будут созданы на одном из узлов кластера, причём где именно — решает менеджер кластера. Чтобы использовать этот режим, понадобится вручную запустить программу spark-submit
. Например, это можно сделать, выполнив следующую терминальную команду:
sh /bin/spark-submit --master yarn --deploy-mode cluster --queue your_queue your_script.py
Однако в таком случае весь контроль над PySpark-приложением перейдёт к созданному процессу your_script.py, который будет запущен на одном из узлов кластера независимо от основного процесса. И только это приложение будет иметь прямой доступ к процессу драйвера и к сохранённым в его памяти данным.
Если в качестве менеджера кластера используется YARN, то в этом режиме развёртывания программа драйвера запускается в рамках процесса ApplicationMaster
.
Зачем может понадобиться перезапускать PySpark-приложение
В проекте нам ежедневно приходится иметь дело с десятками, сотнями, тысячами гигабайтов информации, совершенно отличающихся друг от друга источников. Мы стремимся загружать каждый из них за минимальное время, потребляя как можно меньше ресурсов общего кластера.
При создании SparkContext
можно определить ряд конфигураций, который будет использован в Spark-приложении. Самый популярный набор для конфигурации Spark-приложения обычно напрямую связан с потреблением ресурсов. Приведу в качестве примеров основные из них:
spark.driver.memory — резервируемая память для процесса драйвера (по умолчанию используется 1 Гб);
spark.executor.memory — резервируемая память для одного процесса исполнителя задач (по умолчанию используется 1 Гб);
spark.executor.cores — резервируемое количество ядер для одного процесса исполнителя задач (по умолчанию используется 1);
spark.executor.instances — количество создаваемых исполнителей задач (по умолчанию используется 2).
Для оптимизации обработки данных источников можно было бы реализовать вариант, при котором для каждого из них заранее определяется оптимальный набор конфигураций для PySpark-приложения. Например, для больших источников можно было бы резервировать больше ресурсов (если их будет недостаточно, то могут возникнуть проблемы из-за нехватки памяти), а для небольших источников можно было бы резервировать меньше. Однако у такого подхода есть существенный недостаток: мы забираем ресурсы у кластера до тех пор, пока наше PySpark-приложение полностью не завершит свою работу. А ведь в процессе обработки данных источника вполне может быть ряд промежуточных Spark-задач, которые не нуждаются в использовании такого количества ресурсов. Приведу абстрактный пример проблемной ситуации:
Создаём
SparkContext
с заранее определённым набором конфигураций потребления ресурсов.Вычисляем максимальные и минимальные значения некоторого поля и сохраняем их в памяти драйвера.
В зависимости от полученных результатов определяем, необходимо ли идти дальше и обрабатывать полученные данные.
Партиционируем данные по значению применённой функции к некоторому полю (например, хеширования).
В зависимости от полученных результатов определяем, какие данные не нужно обрабатывать и их достаточно просто перенести из одной директории в другую, а какие данные нужно объединить с уже существующими для дальнейшей фильтрации.
Определяем удаляемые и изменяемые записи в обрабатываемых данных, после чего сохраняем их в отдельной таблице истории.
Создаём резервную копию основной таблицы через физическое копирование файлов из одной директории в другую (например, для HDFS это могла бы быть distcp‑команда, запускающая MapReduce‑задачу).
Определяем актуальное состояние данных и сохраняем их в целевой таблице.
Проставляем необходимые статистики в свойствах целевой таблицы.
Как видите, ресурсы Spark-приложения могут быть полноценно использованы только при выполнении пунктов 4, 6 и 8. При этом до 4 пункта алгоритм обработки данных может вовсе не дойти, а между пунктами 6 и 8 может выполняться отдельная MapReduce-задача, для которой также необходимо выделять отдельные ресурсы, и которая может работать довольно долго. Становится очевидно, что резервировать ресурсы кластера для выполнения задач, выполняя пункты 1-9, неэффективно и нужно найти способ, при котором для каждой Spark-задачи будет выделяться только необходимое количество.
Как известно, изменять конфигурацию драйвера и исполнителей после инициализации объекта SparkContext
не имеет смысла, ведь для них PySpark уже создал отдельные JVM-процессы. Единственный способ изменять потребление ресурсов драйвером и исполнителями — создавать их как новые. И тут есть два пути:
Для каждой Spark‑задачи создавать отдельные Python‑подпроцессы, которые будут инициализировать уникальные PySpark‑приложения с необходимой конфигурацией ресурсов и выполнять заранее запрограммированные действия.
Перезапускать PySpark‑приложение, которое будет заново создавать процессы драйвера и исполнителей с уникальной конфигурацией потребления ресурсов.
Если идти по первому пути, то можно столкнуться с некоторыми трудностями. Во-первых, создаваемые подпроцессы нужно отслеживать и прерывать по мере необходимости. Во-вторых, для выполнения некоторых Spark-задач может понадобиться некоторая информация, которая хранится в основном Python-приложении или которая извлекается в рамках выполнения других Spark-задач. В связи с этим придётся разработать механизм для обмена информацией между PySpark-приложениями и основным процессом, что может привести к дополнительным накладным расходам. В-третьих, это может значительно усложнить и без того непростую архитектуру приложения, ведь под каждую Spark-задачу придётся написать собственное PySpark-приложение.
По второму пути невозможно идти, если используется кластерный режим развёртывания Spark-приложения c YARN-менеджером ресурсов, описанный выше. В этом случае пересоздание процесса драйвера приведёт к уничтожению ApplicationMaster, без которого Spark-приложение просто не сможет функционировать.
Несмотря на это ограничение, далее мы рассмотрим именно второй путь, поскольку он проще и эффективнее, и большинство PySpark-приложений сможет себе позволить идти по нему.
Правильное пересоздание PySpark-приложения
Немного про SparkSession
До выхода версии 2.0 в Apache Spark SparkContext
был единственной точкой входа в Spark, а RDD API был основным. При этом для использования других API необходимо было инициализировать отдельный контекст, будь то HiveContext
или SQLContext
. Начиная с версии 2.0 Apache Spark предлагает новую точку входа при помощи объекта SparkSession
, включающий в себя создаваемый экземпляр SparkContext
и предоставляющий доступ ко всем функциям Spark, которые ранее можно было использовать только в рамках отдельных API. Для инициализации объекта SparkSession
в PySpark-приложении и реализации входа обычно используется его внутренний подкласс Builder
. Для доступа к нему нужно использовать открытый атрибут builder
, являющийся его экземпляром.
В отличие от SparkContext
, в PySpark-приложении может быть создано несколько экземпляров SparkSession
, правда, для этого придётся создавать их через внутренний метод newSession()
. Каждый из них будет иметь изолированную конфигурацию SQL, зарегистрированные временные таблицы и представления, а также зарегистрированные пользовательские функции, известные как UDF. При всём этом каждый объект SparkSession будет ссылаться на единственный экземпляр SparkContext и иметь общее хранилище кешированных данных.
Существующий метод остановки PySpark-приложения
На момент написания статьи Spark предоставляет единственный способ остановки приложения: вызов метода stop()
у экземпляра Scala-класса SparkContext
. Он сбрасывает состояние некоторых атрибутов и задействованных объектов в рамках инициализации, закрывает прослушиваемые порты, а также сообщает менеджеру кластера о необходимости завершения Spark-задачи в кластере и высвобождении всех занимаемых ресурсов JVM процессами исполнителей.
Чтобы обратиться к методу stop()
Scala-класса SparkContext
из PySpark-приложения, достаточно вызвать метод stop()
у экземпляра SparkContext
.
Важно отметить, что при инициализации PySpark-приложения с помощью объекта SparkSession
метод stop()
необходимо вызывать именно у него. В таком случае он самостоятельно вызовет метод stop()
у единственного экземпляра SparkContext
, но дополнительно также сбросит состояние некоторых собственных атрибутов и других задействованных объектов в рамках инициализации SparkSession
.
Этот метод действительно выполняет ряд необходимых действий для корректного завершения Spark-приложения, однако если появится необходимость в его повторном перезапуске, этого будет недостаточно. И вот почему.
Недостатки существующего метода остановки PySpark-приложения
Во-первых, метод stop()
объекта SparkSession
в текущей реализации не сбрасывает состояние внутреннего атрибута builder
. То есть если вы захотите заново инициализировать Spark-приложение с новым набором конфигураций, то старые не изменённые или не удалённые конфигурации будут задействованы повторно.
Во-вторых, этот метод не завершает JVM-процесс драйвера, не высвобождает его ресурсы и не сбрасывает состояние внутренних атрибутов, обеспечивающих связь с ним. Это означает, что даже после остановки Spark-приложения JVM-процесс драйвера продолжит функционировать вместе с Python-процессом, создавшим его, а значит для него не получится переопределить конфигурацию потребляемых ресурсов.
Всё это можно самостоятельно проверить, последовательно запустив два PySpark-приложения с разным набором конфигураций. Например, это можно было бы реализовать следующим образом:
Для первого PySpark-приложения определяются конфигурации памяти драйвера (5 Гб) и исполнителей (10 Гб). В Spark Web UI (см. рисунок 11) в столбце Storage Memory всегда будет указано меньшее значение, поскольку только часть выделенной памяти Spark отводится на обработку и сохранение данных. Подробнее про это написано в официальной документации Spark в рамках описания конфигурации spark.memory.fraction.
Для второго PySpark-приложения изменяется конфигурация памяти драйвера (15 Гб) и определяется количество используемых исполнителями ядер (2).
Однако, как можно заметить, после перезапуска PySpark-приложения изменилось только количество используемых исполнителями ядер. Память исполнителей не изменилась потому, что не было сброшено состояние внутреннего атрибута builder
, поэтому была повторно задействована конфигурация памяти исполнителей из предыдущего запуска. Память драйвера не изменилась потому, что PySpark просто переиспользовал ранее созданный JVM-процесс драйвера.
Также негативным следствием переиспользования JVM-процесса драйвера является тот факт, что с каждым перезапуском PySpark-приложения его память заполняется новым набором кешированных данных, что может привести к ошибке Out Of Memory (OOM). В этом можно самостоятельно убедиться, выполнив следующий Python-скрипт:
Здесь для драйвера по умолчанию Spark самостоятельно выделяет 1 Гб памяти, однако уже после 10 запусков её начинает не хватать. И это даже при том, что между запусками не происходит никаких вычислений.
Если же начать извлекать в память драйвера какие-либо данные, то OOM-ошибку можно получить значительно раньше.
Вполне очевидно, что вышеупомянутые недостатки не позволяют в полной мере использовать алгоритм перезапуска PySpark-приложений для оптимизации потребления ресурсов, а значит необходимо самостоятельно их устранить.
Дополнения к существующему методу остановки PySpark-приложения
Для исправления первой проблемы достаточно перед перезапуском PySpark-приложений атрибут _options
объекта builder
приравнивать к пустому словарю. Этот грубый метод позволит удалить все ранее установленные конфигурации. _options
— это словарь, содержащий все заданные пользователем конфигурации при инициализации PySpark-приложения с помощью Builder
’а.
Для решения второй проблемы можно все атрибуты класса SparkContext
приводить в исходное состояние перед каждым перезапуском. Это заставит PySpark каждый раз инициализировать новый экземпляр класса SparkContext
и создавать связанный с ним JVM-процесс драйвера с новой конфигурацией потребляемых ресурсов. Этого можно добиться, например, выполнив следующий сценарий:
Но не всё так просто, ведь в таком случае PySpark начнёт создавать множество отдельных JVM-процессов, которые не будут завершены, пока функционирует основной Python-процесс.
Это означает, что одного сброса состояния атрибутов класса SparkContext
для реализации механизма перезапуска PySpark-приложений недостаточно. Перед созданием нового JVM-процесса драйвера предыдущий необходимо завершать.
Инициирование завершения JVM-процесса драйвера
Как мы уже знаем, в PySpark используется библиотека PY4J для взаимодействия PVM- и JVM-процессов, а обмен информацией между ними реализован при помощи сокетов. То есть, когда Python-приложение кладёт в сокет некоторую информацию, JVM-процесс драйвера её обрабатывает и при необходимости исполняет. Это означает, что нам достаточно положить в сокет сообщение, которое инициировало бы выполнение команды System.exit(0)
на стороне JVM. Доступ к методу exit
класса System
в JVM через PySpark можно получить следующим образом:
Со стороны PY4J данный метод является экземпляром класса JavaMember
, который позволяет формировать сообщения для его вызова на стороне JVM. Инициализация экземпляра класса JavaMember
выглядит следующим образом:
Логика вызова данного метода определена в реализации магического метода __call__
. Он позволяет формировать и отправлять сообщение для исполнения на стороне JVM.
Однако, как можно заметить, в реализации магического метода __call__
после отправки сообщения в JVM-процесс от него ожидается получение ответа (вызов функции send_command
). Но мы не можем получать ответ от процесса, который уже был завершён. В связи с этим не получится напрямую использовать вызов метода spark._jvm.System.exit(0)
.
Это означает, что нам необходимо самостоятельно сконструировать сообщение и отправить его в JVM-процесс, не ожидая получить ответ.
Для сборки сообщения достаточно переиспользовать часть логики, которая уже была представлена в магическом методе __call__
. Используемые атрибуты proto.CALL_COMAND_NAME
и proto.END_COMMAND_PART
являются константами, которые определены в модуле protocol.py библиотеки PY4J.
Для отправки сообщения нам придётся погрузиться в логику реализации метода send_command
экземпляра класса GatewayClient
, чтобы в дальнейшем переиспользовать её без ожидания ответа. Класс GatewayClient
отвечает за управление подключением к мосту, соединяющему PVM- и JVM-процессы.
В реализации метода send_command
класса GatewayClient
можно заметить, что на самом деле отправка сообщения в JVM-процесс происходит в рамках вызова аналогичного метода send_command
, но уже для экземпляра класса GatewayConnection
, который можно получить, вызвав метод _get_connection()
.
Этот метод разделён на две основные части: отправку и получение сообщения от JVM-процесса. Именно он пробрасывал исключение, когда мы пытались напрямую использовать вызов spark._jvm.System.exit(0)
. Для нас важна только первая часть представленной логики, которая используется для отправки некоторого сообщения в JVM-процесс драйвера.
Помимо прочего, после завершения JVM-процесса драйвера необходимо корректно завершить все открытые соединения с ним. Для этого достаточно вызвать метод shutdown()
у экземпляра класса JavaGateway
, доступ к которому можно получить через атрибут _gateway
класса SparkContext
.
Собрав всю полученную информацию воедино, мы можем реализовать функциональность, позволяющую перезапускать PySpark-приложение и создавать новые процессы драйвера и исполнителей с уникальной конфигурацией потребления ресурсов. Сделать это можно, например, следующим образом:
В предложенном выше сценарии на 12 перезапуске PySpark-приложения мы определяем три уникальные конфигурации: spark.executor.memory, spark.executor.cores и spark.driver.memory. Уже на 13 перезапуске мы убираем конфигурацию spark.executor.cores, а для двух других специально изменяем значение. Ожидается, что на 13 перезапуске мы сможем изменить потребляемую память JVM-процессов драйвера и исполнителей, а конфигурация spark.executor.cores не будет задействована повторно.
Как можно заметить, в любой момент времени функционирует только один JVM-процесс драйвера с уникальным набором конфигураций. При этом, как и ожидалось, во время 13 перезапуска конфигурация spark.executor.cores не была задействована повторно.
Итог
Мы рассмотрели способ оптимизации потребления используемых ресурсов кластера, заключающийся в перезапуске PySpark-приложений между выполняемыми Spark-задачами. По ходу анализа доступных методов остановки Spark-приложения были обнаружены существенные недостатки, не позволяющие в полной мере использовать алгоритм перезапуска. Для их устранения мы погрузились в анализ исходного кода Apache Spark и библиотеки PY4J, необходимой для функционирования инструмента PySpark. Получив всю нужную информацию, мы реализовали алгоритм, который дополняет существующий метод остановки Spark-приложения и устраняет все обнаруженные недостатки, что доказывает проведённое в конце тестирование.
Комментарии (10)
eigrad
04.04.2024 21:30в процессе обработки данных источника вполне может быть ряд промежуточных Spark-задач, которые не нуждаются в использовании такого количества ресурсов
Чем в таком кейсе не подошёл dynamic allocation?
И что помешало разбить процесс на несколько задач (на уровне оркестратора)?
sshikov
04.04.2024 21:30Ну, dynamic allocation это строго про число executors. Если ресурсы это память - то динамической аллокацией это не решается.
Задачи на уровне оркестратора - это всегда сложности интеграции по сравнению с кусками кода одной программы. Они конечно решаются, но это лишний геморрой. Ну т.е. обычно при таком способе у вас вместо взаимодействия по API получается обмен файлами, который далеко не всегда удобен (не говоря уже про такие простые вещи, что эти файлы нужно за собой подчищать и т.п.).
Ну вот скажем, простой пример - если оркестратор Oozie, то задача может записать key-value результат, именуемый data, при помощи которого можно передавать что-то между задачами. Это API, пусть и примитивный.
Но вот проблема - если задача завершается ошибкой, то Oozie считает, что data не будет.
Но ведь задача же может завершиться ошибкой, проделав часы работы, и какие-то результаты у нее все равно будут (я уже не говорю, что в случае Yarn нам бы не мешало как-то сохранить applicationId для последующего анализа, почему оно вообще упало, или скажем код возврата Spark, чтобы понять, что ему скажем памяти не хватило).
eigrad
04.04.2024 21:30Помимо прочего, после завершения JVM-процесса драйвера необходимо корректно завершить все открытые соединения с ним. Для этого достаточно вызвать метод shutdown() у экземпляра класса JavaGateway
А нельзя просто shutdown позвать без всех извращений с ручной отправкой команд?
eigrad
04.04.2024 21:30Сложная тема, чтобы представить как оно работает в случае cluster mode или spark submit (не будет ли каких-то side effect от перезапуска драйвера), нужно как минимум понимать как взаимодействуют yarn/driver/applicationmaster.
dolfinus
04.04.2024 21:30С cluster mode это и не нужно, там драйвер уже запущен с готовым SparkContext, и перезаписать параметры он не может
dolfinus
Эх, эту инфу бы в Issue в Apache Jira, чтобы наконец PySpark научился сам за собой все корректно убирать. А не оставлять свои глобалы на глобалах
dolfinus
Завел https://issues.apache.org/jira/browse/SPARK-47740
dolfinus
И в Py4J завел PR https://github.com/py4j/py4j/pull/541
eigrad
(сорян, промазал полем ответа)