Привет, Хабр! Меня зовут Денис, в компании oneFactor я занимаю позицию архитектора, и одна из моих обязанностей — это развитие технического стека компании. В этой статье я расскажу про нашу data platform’у (далее просто DP или платформа) и про мотивацию внедрения в неё Kubernetes. Также подсвечу трудности, с которыми мы столкнулись в рамках пилота. И расскажу про набор активностей, которые не вошли в пилот, но будут выполнены во время миграции. Дополнительно представлю короткий обзор текущей интеграции между Spark и Kubernetes. Стоит отметить, что вопросы, связанные с хранилищем, здесь обсуждаться не будут.

Краткое описание платформы

Основными элементами текущего стека нашей DP являются HDFS и YARN. В YARN мы запускаем исключительно Spark приложения, кластер настроен для выполнения Spark-приложений различных версий (от 2.0.2 до 3.2.1). Дни, когда на платформе выполняется более 50000 приложений, считаются нормой. Эти приложения имеют разную природу, часть из них запускаются регулярно, и каждый запуск потребляет примерно одинаковое количество ресурсов. Часть приложений запускается по запросу пользователей или внешних (по отношению к DP) систем. Для некоторых из них стабильность выполнения гораздо важнее скорости, но часто наоборот. Кроме того, часть ресурсов кластера необходимо аллоцировать для RnD активностей, в которые входят запросы от коллег из аналитического департамента и активности от дата-инженеров. Для того, чтобы все эти разношерстные приложения могли быть выполнены в прогнозируемый период времени, используя ограниченный набор ресурсов, в YARN мы использовали большое количество предоставляемых возможностей по настройке очередей, таких как:

Weights ­— для приоритизации выделения ресурсов.

Max Running Apps —  для управления нагрузкой на upstream сервисы. Например, когда с помощью Spark загружаем данные во внешнее хранилище и нам необходимо ограничить нагрузку, которую мы на него создаем. Или для случаев, когда с владельцем продукта согласовано максимальное количество одновременно запущенных приложений.

Иерархичность — чтобы объединить потребляемые ресурсы между несколькими похожими классами задач, но к которым предъявляются немного разные требования.

Preemption — чтобы дать возможность приложениям со строгим SLA получать обещанные ресурсы за счет принудительной остановки YARN контейнеров в менее важных очередях.

Max Resources (Virtual Core и Memory) —  чтобы не позволить одному классу приложений захватить все ресурсы кластера.

Платформа установлена на собственных серверах в ЦОДе одного из российских cloud provider'ов.

Почему Kubernetes?

Наш текущий Hadoop кластер построен на версии Hadoop 2. Основная задача, которую мы сейчас решаем, это проработка будущего стека. После его выбора мы сможем сформировать roadmap для миграции. Рассматриваются следующие варианты:

  1. Переезд на коммерческие решения (такие как Cloudera Data Platform). Данный вариант отпадает, т.к. стоимость владения становится очень высокой, а отсутствие бесшовной миграции с нашей версией Hadoop не позволяет нам выполнить миграцию в короткие сроки.

  2. Миграция на Hadoop от другого вендора. Те же самые «за» и «против», как в предыдущем пункте.

  3. Миграция на Apache Hadoop 3. На ряду с Kubernetes рассматривается как один из возможных вариантов.

  4. Kubernetes. Герой нашего повествования, так же, как и Apache Hadoop 3, входит в наш short-лист кандидатов.

Итак, главная причина, которая драйвит нас к переходу на Kubernetes —  это обновление нашего технологического стека, элементы которого уже находятся в End Of Life (EOL). В дополнение к этому мы планируем получить следующие преимущества:

  • Более простая схема масштабирования.

  • Решение проблемы эластичности, когда возникает резкий и высокий спрос на ресурсы с помощью динамического использования ресурсов cloud provider'а по модели pay as you go.

  • Переносимость платформы. Kubernetes набирает популярность, услугу Managed Kubernetes кластера предоставляют все крупные облачные поставщики.

  • В случае Managed Kubernetes сокращаются трудозатраты на поддержку инфраструктуры.

Spark on Kubernetes

Поддержка Kubernetes в Spark заявлена начиная с версии 2.3, а с версии Spark 3.0 поддержка Kubernetes перешла в фазу production ready. Для более удобного запуска Spark-приложений разработан Spark Operator. Пример из официального репозитория ниже:

apiVersion: "sparkoperator.k8s.io/v1beta2"
kind: SparkApplication
metadata:
  name: spark-pi
  namespace: default
spec:
  type: Scala
  mode: cluster
  image: "gcr.io/spark-operator/spark:v3.1.1"
  imagePullPolicy: Always
  mainClass: org.apache.spark.examples.SparkPi
  mainApplicationFile: "local:///opt/spark/examples/jars/spark-examples_2.12-3.1.1.jar"
  sparkVersion: "3.1.1"
  restartPolicy:
    type: Never
  volumes:
    - name: "test-volume"
      hostPath:
        path: "/tmp"
        type: Directory
  driver:
    cores: 1
    coreLimit: "1200m"
    memory: "512m"
    labels:
      version: 3.1.1
    serviceAccount: spark
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp"
  executor:
    cores: 1
    instances: 1
    memory: "512m"
    labels:
      version: 3.1.1
    volumeMounts:
      - name: "test-volume"
        mountPath: "/tmp" 

Кроме привычного для Kubernetes оформления в Yaml формате, Spark Operator предоставляет некоторый набор «сахарных» конструкций, которые облегчают работу со Spark. Например, объявление spec.hadoopConfigMap и spec.sparkConfigMap упрощают монтирование соответствующих конфигурационных файлов в нужные места, освобождая нас от необходимости объявления volume’ов в Yaml файле в явном виде.

Kubernetes и batch

Пожалуй, это один из самых интересных разделов в статье, и он сам по себе заслуживает отдельной статьи, но постараюсь быть кратким. Выше, в части «Краткое описание платформы», есть функционал очередей, которым мы пользуемся на платформе, для того, чтобы в условиях ограниченных ресурсов удовлетворить потребность разных типов приложений. Планировщик Kubernetes сам по себе данного функционала не предоставляет, но API Kubernetes позволят разработать собственный планировщик и использовать его.

Мы рассматривали две open-source реализации планировщика для Kubernetes, заточенного для выполнения batch-приложений:

  • Apache YuniKorn —  основной контрибьютор Cloudera, судя по найденным презентациям в Интернете, он активно используется в Apple.

  • Volcano —  основной контрибьютор Huawei.

В экспериментальную часть своих исследований, в силу ограниченного количества времени, мы выбрали Volcano. Основная причина — поддержка Volcano в Spark 3.3.

Вот так выглядит кусочек нашего Yaml, в котором мы указывали на использование Volcano:

spec:
  type: Scala
  mode: cluster
  image: custom-build-spark3.3.0-on-k8s:0.0.28
  mainClass: com.onefactor.spark33.App
  mainApplicationFile: "hdfs://nameservice1/spark/spark3-mvp/app.jar"
  sparkVersion: 3.3.0
  restartPolicy:
    type: Never
  hadoopConfigMap: hadoop-conf
  sparkConfigMap: spark-conf-k8s
  batchScheduler: volcano
  sparkConf:
    spark.kubernetes.driver.pod.featureSteps: org.apache.spark.deploy.k8s.features.VolcanoFeatureStep
    spark.kubernetes.scheduler.name: volcano  

В этом отрывке ключевыми являются batchScheduler и пара параметров в секции sparkConf. Также есть возможность задать spark.kubernetes.scheduler.volcano.podGroupTemplateFile

Основные моменты, которые здесь хочется подсветить:

  • В настоящий момент community Spark готовит новый релиз Spark 3.3.0 (т.к. официального релиза пока нет, то в своих экспериментах мы использовали собственную сборку).

  • Spark 3.3.0 будет иметь поддержку Volcano (может быть, к релизу успеют добавить и поддержку YuniKorn).

  • В Kubernetes community осознают важность batch-приложений, и на только что прошедшей конференции KubeCon этой теме был посвящен выделенный стрим.

    Интеграция с HDFS

    Т.к. наш текущий стек основан на Hadoop, то для чтения и записи данных во время экспериментов приходилось взаимодействовать с HDFS. В нашем Hadoop кластере включена аутентификация через Kerberos.

    Для того, чтобы выполнить аутентификацию, использовался side-car контейнер. Связка HDFS, Kerberos, K8s через Spark Operator выглядит следующим образом:

spec:
  hadoopConfigMap: hadoop-conf
  volumes:
  - name: krb5-conf
    configMap:
      defaultMode: 420
      name: krb5-client
  - name: keytab
    secret:
      defaultMode: 420
      secretName: client-keytab
  - name: ccache
    emptyDir:
      medium: Memory
  imagePullSecrets:
  - our-registry
  driver:
    initContainers:
      - name: kinit-sidecar
        image: kinit-sidecar:0.1.0
        volumeMounts:
        - mountPath: /dev/shm
          name: ccache
        - mountPath: /etc/krb5.conf
          name: krb5-conf
          subPath: krb5.conf
        - mountPath: /krb5/krb5.keytab
          name: keytab
          subPath: client.keytab

Более подробно о реализации krb аутентификации с помощью side-car контейнера можно посмотреть на форуме openshift.

Наши результаты

В качестве экспериментов мы выбрали Spark приложение, которое у нас работает в Yarn’е. И без внесения изменений в его кодовую базу выполняли запуск в Kubernetes.

Особенности приложения:

  • количество входных записей ~1 500 000 000;

  • приложение использует операцию кэширования;

  • имеется shuffle;

  • количество выходных записей чуть больше, чем входных;

  • YARN: для выполнения приложения за 10 минут необходимо 40 серверов с 64GB памяти и 10vCores;

  • Kubernetes: для выполнения приложения за 10 минут необходимо 24 сервера с 64GB памяти и 10vCores (на 40% быстрее). Прежде всего, это связано с тем, что для хранения spill и shuffle промежуточных данных использовались PVC на SSD дисках (в то время как в Yarn использовались обычные диски). После завершения Spark-приложения SSD-диски автоматически удаляются.

Планы

Во время наших тестов Kubernetes показал себя ожидаемо, и мы получили подтверждение, что он может быть внедрен в нашу платформу. Следующий этап — нагрузочное тестирование и проверка того, что работа очередей соответствует нашим требованиям. Также для нас пока что остается нерешенным вопрос о использовании внешнего shuffle сервиса (несмотря на его отсутствие в наших экспериментах, мы использовали динамическое аллоцирование executors, в этом нам помогли конфигурационные параметры spark.dynamicAllocation.shuffleTracking.enabled и spark.dynamicAllocation.shuffleTracking.timeout).

На этом всё. Спасибо, что прочитали статью до конца. Делитесь вашим опытом и рекомендациями в комментариях.

Комментарии (0)