Меня зовут Дмитрий Курганский, я Tech Lead команды MLOps в Банки.ру. Мы работаем над тем, чтобы грамотно организовать и ускорить этапы жизненного цикла ML. В этой статье поделюсь нашим опытом применения Embedding: от запуска Яндекс Data Proc кластера через Airflow до оптимизации этапа применения Embedding с помощью Spark. Материал в целом будет актуален для этапа применения (inference) любых моделей для больших наборов данных, работающих в batch режиме по расписанию.

Что в статье: 

  • почему решили выбрать Apache Spark и именно в Яндекс Data Proc

  • какие альтернативы инструменту и платформе рассматривали

  • неочевидные из документации детали конфигурации кластера, библиотеки провайдера Airflow и самих DAG’ов

  • примеры кода с оберткой приложения в Spark

Кому будет полезна статья

Прежде всего, MLOps и ML командам, которые хотят попробовать Spark для inference задач с минимальными затратами сил и времени. Если вы хотя бы немного знакомы со Spark и Airflow, содержимое статьи не вызовет больших трудностей. Если задумаетесь о реализации — будет полезен опыт работы с сервисами Яндекса и инженеры, которые могут помочь с настройкой.

Наш стек и зачем нам Apache Spark

Наша ML инфраструктура преимущественно построена на сервисах Яндекс Cloud. Прежде всего, это Kubernetes. Поверх него развернут Airflow, который как раз запускает Batch ML модели по расписанию через KubernetesPodOperator. Изначально для решения задачи по оптимизации inference мы запускали оператор на выделенной автомасштабируемой группе узлов Kubernetes в нескольких экземплярах параллельно. И если ускорить процесс получалось, то вот сделать его удобным — не совсем. Приходилось управлять обрабатываемыми партиями данных в каждом операторе, делать для этого дополнительные поля в таблицах и логические условия в приложениях, запускающих ML модель.

Чтобы наладить процесс, решили использовать Apache Spark. Популярный инструмент, относительно простой для реализации задачи, технически доступный.

Да, для ускорения Embeddings прежде всего рекомендуют использовать GPU. Но в нашем случае получить приемлемое ускорение процесса в целом дешевле на Spark за счет параллельных вычислений чем за счет ускорения самого вычисления на GPU. Аналогичная ситуация с этапом inference остальных наших моделей.

Какую платформу выбрали для развертывания Apache Spark 

Сам по себе Spark как фреймворк, конечно, задачу не решит. Под него нужны вычислительные мощности. Сразу целились в облако: хотим быстро начать (попробовать) и платить только за использование, как делаем с облачным Kubernetes. Рассматривали его и Яндекс Data Proc. 

К очевидным плюсам варианта со Spark на Kuberentes можно отнести возможность выбора любой доступной версии инструментов (Spark, Python), тогда как в DataProc версии ограничены сверху. Но для нас это было не критично. А еще на конференции Union All 2024 Яндекс в очередной раз напомнил о скором выходе Spark на Kubernetes в виде отдельного сервиса, что, надеемся, станет золотой серединой. По крайней мере, обещали оперативнее обновлять версии инструментов.

Оба варианта позволяют платить только за использование. Но вот развернуть решение, а следовательно, начать тестировать, оказалось быстрее с Data Proc. Пока кажется, что поддерживать тоже. Так что остановились на нём. 

Есть, кстати, интересная статья на тему сравнения этих подходов, но относительно аналога Data Proc от Amazon (EMR).

Если рассматривать аналоги подобного продукта как часть платформы других вендоров, то на отечественном рынке, кажется, единственным полноценным аналогом будут облачные сервисы VK Cloud. Детальнее к ним не приглядывались, так как уже был опыт использования сервисов Яндекса.

Мотивация к написанию статьи

Так как Яндекс Data Proc продукт относительно новый и больше ориентирован на отечественный рынок, пишут про него мало. Найти дополнительную информацию о практике работы сложно. Кроме как в документации Яндекса, в интернете ее практически нет, включая любимый stackoverflow. Лайфхак: иногда помочь может поиск ответов на вопросы для Google Data Proc, которые могут подойти и для аналога от Яндекс. У продуктов схожее устройство, в том числе и в библиотеках для Airflow.

На Habr есть несколько статей про Google Data Proc, но я не уверен в их актуальности. А вот про аналог от Яндекса мы нашли только одну, но интересную: Магнит ИТ рассказывает об опыте использования инструмента для обработки данных.

Какие инструменты использовали

В работе использовали Data Proc образ версии 2.1.15. Это последняя актуальная и доступная без запроса к поддержке версия на момент выхода статьи. В ней используется довольно свежий Spark 3.3.2, но далеко не самый актуальный Python 3.8. Поэтому вам могут потребоваться другие версии библиотек и/или правки в коде приложений, которые планируете запустить на Spark, если используете в них фишки более новых версий Python.

Для Airflow 2.3.4 использовали библиотеку-провайдер apache-airflow-providers-yandex версии 3.3.0, в зависимости к которой взяли библиотеку yandexcloud версии 0.227.0.

Для Embeddings у нас Sentence Transformers версии 2.2.2.

Запуск Data Proc через Airflow

Этот шаг в документации описан достаточно подробно, включая этапы подготовки облачной инфраструктуры. По крайней мере, для нашей задачи этого объема хватило.

Остановлюсь только на двух нюансах настройки:

  1. При создании подключения Amazon Elastic MapReduce для Airflow в инструкции упоминается поле Run Job Flow Configuration, но до версии Airflow 2.5.0 оно называется Extra.

  2. Документация не всегда поспевает за обновлениями версий кластера, поэтому перед выбором cluster_image_version лучше сначала проверить последнюю доступную через веб-интерфейс (консоль) Яндекс. Причем, стоит именно попробовать создать с ней тестовый кластер, так как самая последняя версия из доступных для выбора может предоставляться только по запросу к поддержке. А понятно это будет только после нажатия на кнопку «Создать кластер».

Интереснее рассмотреть в этой главе подходы к стратегии организации ресурсов по запросу. Их две:

  1. создание/удаление нового кластера; 

  2. старт/запуск одного постоянного.

Конкретно для Airflow в документации мы видим только пример DAG’а с последовательным созданием Data Proc кластера, запуском PySpark операторов и удалением кластера. Тем не менее рассмотрим детали реализации, плюсы и минусы каждого варианта по ряду пунктов в таблице ниже.

 

Создание / Удаление

Запуск / Остановка

Скорость поднятия кластера

Медленнее

Быстрее

Организация сетевой доступности Web UI инструментов мониторинга: Spark History Server, YARN Resource Manager и т. д.

Сложнее — без костылей сторонней автоматизации не получится сделать, так как url для Web UI привязан к FQDN Master Node, а для каждого нового кластера он будет разный.

Проще: достаточно настроек из документации.

Высвобождение квоты по вычислительным ресурсам (CPU, RAM) в фазе простоя

Есть

Нет

Оплата за дисковое хранилище в фазе простоя

Есть

Нет

Удобство организации через Airflow

Все, что нужно, доступно «из коробки»

В библиотеке провайдере Yandex. Cloud Data Proc для Airflow операторов для остановки и запуска кластера нет в принципе. При необходимости исправить можно самостоятельно: сделать операторы на основе API Data Proc, в котором такой функционал имеется.

Мы пока окончательно не решили, какой вариант подходит нам больше. Временно остановились на создании/удалении, но не под каждую модель в отдельности, а единоразово для прогона всех сразу. Так как они у нас выполняются примерно в одно время. Для этого пришлось научиться передавать актуальный cluster_id от DAG’а, в котором происходит старт кластера к DAG’ам с применением моделей, использующим PySpark оператор, и в конце к DAG’у, удаляющему кластер. Подробнее об этом в следующей главе.

Заворачиваем Embedding в Spark

Алгоритм примерно такой:

  • Распараллеливаем данные по task’ам, либо распределено выгружая их коннектором из нужного хранилища, либо из pandas dataframe.

  • Делаем broadcast модель по executor’ам.

  • Создаем pandas UDF-функцию. Она применится к набору данных в каждой task’е по chunk’ам, если набор данных внутри будет слишком большой. По умолчанию размер одного chunk — 10 000 записей, он регулируется параметром maxRecordsPerBatch. По ссылке хороший материал на английском про назначение и отличие Pandas и Python UDF.

  • Пишем pipeline ленивых вычислений, заканчивающийся Spark Action. Допустим, на запись результата в базу.

Код, распределяющий этап encode для Embedding фреймворка Sentence Transformers, выглядит так:

broadcasted_model = spark.sparkContext.broadcast(model)
spark_df = spark.createDataFrame(df).repartition(num_partitions)

@spark_functions.pandas_udf(returnType=ArrayType(FloatType()))
def encode_sentences(x: pd.Series) -> pd.Series:
    return broadcasted_model.value.encode(
        x,
        batch_size=encode_batch_size,
    ).tolist()

(
    spark_df
    .withColumn('encoded_sentences', encode_sentences(spark_functions.column('sentences')))
    .write
    .format('bigquery')
    .option('writeMethod', 'direct')
    .option('table', 'result_table_name')
    .mode('overwrite')
    .save()
)

Для метода encode не забываем указать (подобрать) batch_size. Он значительно улучшит скорость в сравнении с построчным кодированием.

Настройка DataprocCreatePysparkJobOperator

  • Первым делом готовим запускаемый файл, в котором будет инициализация SparkSession, а также импорт и запуск функционала нашего Python приложения с применением ML модели. 

  • Далее собираем Python-приложение в виде zip или egg архива. Второй можно собрать командой python setup_tools bdist

  • Отправляем всё это в S3. 

  • По необходимости туда же jar архивы дополнительных Spark библиотек. Например, connector's к базам данных.

Теперь конфигурируем сам оператор:

spark_encode = DataprocCreatePysparkJobOperator(
    task_id='task_id',
    name='task_name',
    cluster_id="'{{ ti.xcom_pull(dag_ids='dag_id', task_ids='task_id', key='cluster_id') }}'",
    connection_id=ycSA_connection.conn_id,
    main_python_file_uri=f's3a://{YANDEX_BUCKET}/{APP_PATH}/main.py',
    python_file_uris=[f's3a://{YANDEX_BUCKET}/{APP_PATH}/ml_app-1.0.0-py3.8.egg'],
    jar_file_uris=[f's3a://{YANDEX_BUCKET}/jars/spark-connector-xxx.jar'],
    args=[
        '--arg_1', 'arg_1_value',
        '--arg_2', 'arg_2_value',
    ],
    properties={
        'spark.submit.master': 'yarn',
        'spark.submit.deployMode': 'cluster',
        'spark.executor.memory': '4g',
        'spark.executor.instances': '4',
        'spark.executor.cores': '2',
        'spark.driver.cores': '2',
        'spark.task.cpus': '2',
    },
)

Через args передаем параметры для нашего Python ML приложения, если требуется. Через properties — параметры для Spark. Мы используем именно выделенный кластер по запросу, поэтому следует сконфигурировать его, учитывая нагрузку. После использовать доступные ресурсы нужно так, чтобы не было их простоя. В этом как раз помогают соответствующие properties. На эту тему может быть полезна следующая статья.

Если используете модель, которая много весит, не забывайте рассчитать достаточный объем RAM для этой модели именно на каждую task, а не на executor. Подробнее о параметрах Spark, полезных для аналитики и ML, можно почитать в этой статье.

Про connection_id хорошо написано в инструкции Яндекса, которую мы упоминали ранее, поэтому вернемся к cluster_id

В примере из документации значение этого параметра для DataprocCreatePysparkJobOperator не указано, так как DataprocCreateClusterOperator передает его через xcom в последующие операторы. Те, в свою очередь, читают xcom через метод _setup, определенный в родительском классе DataprocBaseOperator:

def _setup(self, context: Context) -> DataprocHook:
    if self.cluster_id is None:
        self.cluster_id = context["task_instance"].xcom_pull(key="cluster_id")

и используют в качестве значения по умолчанию.

Если же DataprocCreateClusterOperator будет в отдельном DAG’е, то, в зависимости от выбранной стратегии, возможны как минимум такие варианты:

  1. При запуске/остановке кластера можно просто указать или сохранить в Variables фиксированное значение, так как оно меняться не будет.

  2. При варианте с созданием/удалением можно получить актуальный cluster_id также через xcom. Просто в отличие от реализации, которая используется по умолчанию в DataprocBaseOperator, укажем еще dag_id DAG’а, в котором и создаем кластер. Подобный пример как раз у нас в коде выше.

Логи Spark Data Proc в Airflow

Неприятный момент: логи в операторы Airflow не пробрасываются, только общий статус (не)выполнения задачи. Например, у библиотеки провайдера для Google Data Proc есть кнопка в UI-интерфейсе оператора, так называемый ExtraLink. Она ведет в консоль управления Google Cloud, где можно найти логи. У Яндекса пока такой опции нет.

Приятный момент: можно сделать свою реализацию для ExtraLink DataprocCreatePysparkJobOperator. Причем вести она может сразу на Spark UI. Собственно, что мы и сделали. Подробнее о том, как создавать и добавлять ExtraLink к операторам тут.

Для этого при нашем подходе «создание/удаление кластера» динамически обновляем привязку FQDN Master Node к CNAME через сторонний скрипт после каждого рестарта кластера. Для получения FQDN можно воспользоваться утилитой yc:

yc dataproc cluster list-hosts --id="${cluster_id}" | grep MASTERNODE | awk '{print $2}'

передав cluster_id из DAG’а, в котором создаем кластер. Далее добавляем ссылку на этот CNAME и порт, на котором, например, YARN Resource Manager Web UI. В идеале можно попробовать сделать кнопку-ссылку сразу на лог конкретного Spark приложения.

Секреты

Отдельного внимания заслуживает тема работы с секретами. В отличие от Docker контейнера, пробросить имена, пароли, явки в Spark приложение, запущенное в Яндекс Data Proc, без использования менеджера паролей не получится. Если у вас в компании пока такого инструмента нет, то можно воспользоваться, например, облачным продуктом Яндекса: LockBox. К сожалению, хорошей документации для Python API нет, но вот тут есть понятный пример.

Вывод

Общий процесс применения ML моделей на больших наборах данных, на первый взгляд, точно стал более универсальным. Настраивать его стало удобнее. Про скорость сказать пока сложно, а особенно про ее соотношение с затратами: наша реализация на Kubernetes тоже потребляла ресурсы по запросу в выделенной группе узлов, и также позволяла организовать параллельный расчет.

Однако сам Spark для нас перспективен не только с точки зрения применения Embeddings или ML моделей. С этих задач было проще начать. Если появится необходимость, адаптируем Spark и для подготовки данных. Тем более, это его основной функционал. Также подумаем о том, как его использовать для обучения моделей, для которых уже есть готовые реализации на Spark. В соотношении скорость/стоимость это может и не будет дешевле использования GPU, но точно даст приемлемое ускорение за меньшие деньги. К тому же, при дальнейшем росте объема данных, ничто не мешает использовать Spark кластер вместе с GPU.

С нетерпением ждем для тестирования Spark на Kubernetes от Яндекс.

Комментарии (0)