Привет, Хабр! Я занимаюсь инженерией данных в Х5 Tech. В этой статье я решил поделиться проблемами, с которыми столкнулись при распараллеливании CatBoost на Spark, и как мы нашли решение. Возможно, это не rocket science, но если бы со мной поделились такими ответами заранее, я бы сэкономил себе пару вечеров свободного времени.
Пара вводных
Пока не перешли к задаче, скажу пару слов про нашу инфраструктуру (чтобы вы поняли, актуально вам читать дальше или нет). Мы живём в Kubernetes 1.17, в котором поднят Airflow 1.10.15 (c Kubernetes Executor). Данные хранятся на HDFS в экосистеме Hadoop. Для работы с данными используем PySpark 2.4.5, и делаем это, в основном, двумя способами: через Spark Submit Operator или на поднятом поде (например, при выполнении Python Operator) используем локальный Spark или клиентский режим (client mode).
Сама постановка бизнесовой стороны задачи звучит довольно банально. У нас есть некий пайплайн, на одном из этапов которого происходит обучение и предикт на локальном CatBoost: поднимаются параллельно несколько мощных по оперативке подов (Python Operator в Airflow) и внутри каждого отрабатывает CatBoost (данные с HDFS не помещаются в оперативы одного пода, поэтому делятся батчами на количество операторов соответственно).
Проблемы
При таком раскладе есть две проблемы. Первая – неэффективное использование ресурсов в Kubernetes. Вторая – каждое обучение происходит на десятой части данных. Поэтому мы предположили, что модельная точность вырастет, если обучение будет происходить сразу на всех данных.
Так мы решили перейти на “кластерный” CatBoost, а именно – выполнять расчёты в Hadoop-кластере на кластерном Spark-е, т. к. для CatBoost есть “решение из коробки”, которое предполагало дополнение одного конфига: https://catboost.ai/en/docs/installation/spark-installation-pyspark.
Client Mode Spark
Для эксперимента решили попробовать в клиентском режиме Spark-а: драйвер находится у нас на поде в Kubernetes-кластере, экзекьюторы – в Hadoop-кластере. Для того, чтобы дотянуться из Kubernetes пода до Hadoop-кластера, используем сервис типа NodePort:
spark-driver-node-port NodePort 10.43.92.218 <none> 39571:39571/TCP,35931:35931/TCP,30409:30409/TCP
в котором пробрасываем 3 порта (driver, block-manager, ui) для задания в конфиге Spark-а:
SparkSession
.builder
…
.config('spark.driver.port', driver_port)
.config('spark.driver.blockManager.port', block_manager_port)
.config('spark.ui.port', ui_port)
.getOrCreate()
но внутри catboost listeningport=0 https://github.com/catboost/catboost/blob/dfc3f04050ef9acc3f070f38fe363fec93a3bc8d/catboost/spark/catboost4j-spark/core/src/main/scala/ai/catboost/spark/CatBoostPredictor.scala#L228
Подробнее о проблеме и её состоянии на сегодняшний день можно посмотреть здесь: https://github.com/catboost/catboost/issues/2181
Cluster Mode Spark
На проде хотелось бы использовать Spark Submit Operator, поэтому по инструкции дополнили spark_config: https://catboost.ai/en/docs/installation/spark-installation-pyspark.
В случае отсутствия на Hadoop-кластере интернета предварительно нужно выгрузить вот эти jar файлы на HDFS:
ai.catboost_catboost-common-1.1.jar
ai.catboost_catboost-spark_2.4_2.11-1.1.jar
io.github.classgraph_classgraph-4.8.98.jar
и также дописать в конфиг в “spark.jars”.
Но мы столкнулись с ошибкой:
Py4JJavaError: An error occurred while calling o663.fit.
О ней можно найти пару ссылок с разными решениями в интернете вроде этой: https://github.com/catboost/catboost/issues/1585?ysclid=le1hctqn2n472605600, но ни одно из них не пофиксило ошибку.
Решение
Методом проб и ошибок в итоге мы получили ответ:
jars = "hdfs:///share/path/to/catboost/jars*"
SparkSubmitOperator(
…
conf=spark_submit_config,
executor_config=kubernetes_executor,
jars=jars,
py_files=jars,
)
Нужно сконфигурировать оператор, как написано выше, а на HDFS положить необходимые jars. Необходимыми оказались не те три джарника, о которых я писал выше, а вот этот минимальный набор:
Его достаточно просто получить: поставить локальный CatBoost и из скачанных джарников выбрать данный набор (версии, учитывая инфраструктурные особенности, могут отличаться).
Теперь всё должно работать. Протестировать можно на примерах из документации, только дополнительно задайте параметр sparkPartitionCount:
catboost_spark.CatBoostClassifier(sparkPartitionCount=10)
Подтверждение, что всё работает корректно, можно увидеть в Spark History Server, а именно 10 запущенных экзекьюторов.
В итоге у нас получилось полностью снять нагрузку с Kubernetes-кластера и ускорить вычисления.