Привет, Хабр! Я занимаюсь инженерией данных в Х5 Tech. В этой статье я решил поделиться проблемами, с которыми столкнулись при распараллеливании CatBoost на Spark, и как мы нашли решение. Возможно, это не rocket science, но если бы со мной поделились такими ответами заранее, я бы сэкономил себе пару вечеров свободного времени.

Пара вводных

Пока не перешли к задаче, скажу пару слов про нашу инфраструктуру (чтобы вы поняли, актуально вам читать дальше или нет). Мы живём в Kubernetes 1.17, в котором поднят Airflow 1.10.15 (c Kubernetes Executor). Данные хранятся на HDFS в экосистеме Hadoop. Для работы с данными используем PySpark 2.4.5, и делаем это, в основном, двумя способами: через Spark Submit Operator или на поднятом поде (например, при выполнении Python Operator) используем локальный Spark или клиентский режим (client mode).

Сама постановка бизнесовой стороны задачи звучит довольно банально. У нас есть некий пайплайн, на одном из этапов которого происходит обучение и предикт на локальном CatBoost: поднимаются параллельно несколько мощных по оперативке подов (Python Operator в Airflow) и внутри каждого отрабатывает CatBoost (данные с HDFS не помещаются в оперативы одного пода, поэтому делятся батчами на количество операторов соответственно). 

Проблемы

При таком раскладе есть две проблемы. Первая – неэффективное использование ресурсов в Kubernetes. Вторая – каждое обучение происходит на десятой части данных. Поэтому мы предположили, что модельная точность вырастет, если обучение будет происходить сразу на всех данных.

Так мы решили перейти на “кластерный” CatBoost, а именно – выполнять расчёты в Hadoop-кластере на кластерном Spark-е, т. к. для CatBoost есть “решение из коробки”, которое предполагало дополнение одного конфига: https://catboost.ai/en/docs/installation/spark-installation-pyspark.

Client Mode Spark

Для эксперимента решили попробовать в клиентском режиме Spark-а: драйвер находится у нас на поде в Kubernetes-кластере, экзекьюторы – в Hadoop-кластере. Для того, чтобы дотянуться из Kubernetes пода до Hadoop-кластера, используем сервис типа NodePort: 

spark-driver-node-port    NodePort   10.43.92.218   <none>     39571:39571/TCP,35931:35931/TCP,30409:30409/TCP

в котором пробрасываем 3 порта (driver, block-manager, ui) для задания в конфиге Spark-а:

SparkSession
  .builder
  …
  .config('spark.driver.port', driver_port)
  .config('spark.driver.blockManager.port', block_manager_port)
  .config('spark.ui.port', ui_port)
  .getOrCreate()

но внутри catboost listeningport=0 https://github.com/catboost/catboost/blob/dfc3f04050ef9acc3f070f38fe363fec93a3bc8d/catboost/spark/catboost4j-spark/core/src/main/scala/ai/catboost/spark/CatBoostPredictor.scala#L228

Подробнее о проблеме и её состоянии на сегодняшний день можно посмотреть здесь: https://github.com/catboost/catboost/issues/2181

Cluster Mode Spark

На проде хотелось бы использовать Spark Submit Operator, поэтому по инструкции дополнили spark_config: https://catboost.ai/en/docs/installation/spark-installation-pyspark.

В случае отсутствия на Hadoop-кластере интернета предварительно нужно выгрузить вот эти jar файлы на HDFS:

ai.catboost_catboost-common-1.1.jar 
ai.catboost_catboost-spark_2.4_2.11-1.1.jar
io.github.classgraph_classgraph-4.8.98.jar

и также дописать в конфиг в “spark.jars”. 

Но мы столкнулись с ошибкой:

Py4JJavaError: An error occurred while calling o663.fit.

О ней можно найти пару ссылок с разными решениями в интернете вроде этой: https://github.com/catboost/catboost/issues/1585?ysclid=le1hctqn2n472605600, но ни одно из них не пофиксило ошибку.

Решение

Методом проб и ошибок в итоге мы получили ответ:

jars = "hdfs:///share/path/to/catboost/jars*"
SparkSubmitOperator(
     …
     conf=spark_submit_config,
     executor_config=kubernetes_executor,
     jars=jars,
     py_files=jars,
)

Нужно сконфигурировать оператор, как написано выше, а на HDFS положить необходимые jars. Необходимыми оказались не те три джарника, о которых я писал выше, а вот этот минимальный набор:

Его достаточно просто получить: поставить локальный CatBoost и из скачанных джарников выбрать данный набор (версии, учитывая инфраструктурные особенности, могут отличаться).

Теперь всё должно работать. Протестировать можно на примерах из документации, только дополнительно задайте параметр sparkPartitionCount:

catboost_spark.CatBoostClassifier(sparkPartitionCount=10)

Подтверждение, что всё работает корректно, можно увидеть в Spark History Server, а именно 10 запущенных экзекьюторов.

В итоге у нас получилось полностью снять нагрузку с Kubernetes-кластера и ускорить вычисления.

Комментарии (0)