Сегодня я вам расскажу, как нам удалось решить задачу портирования Spark Structured Streaming Applications на Kubernetes (K8s) и реализовать CI стриминга.

Как всё начиналось?


Стриминг выступает ключевым компонентом BI-платформы ФАСТЕН РУС. Real-time данные используются командой дата анализа для построения оперативных отчетов.

Стриминговые приложения реализованы средствами Spark Structured Streaming. Этот фрэймворк обеспечивает удобный API трансформации данных, что отвечает нашим потребностям в плане оперативности доработок.

Сами стримы поднимались на кластере AWS EMR. Таким образом при подъёме нового стрима на кластер выкладывался ssh-скрипт на сабмит Spark-джобы, после чего запускалось приложение. И сначала нас всё вроде бы устраивало. Но с ростом числа стримов становилась всё более очевидной потребность в реализации CI стриминга, что повысило бы автономность команды дата анализа при запуске приложений на доставку данных по новым сущностям.

И сейчас мы рассмотрим, как нам удалось решить эту проблему путём портирования стриминга на Kubernetes.

Почему Kubernetes?


Kubernetes как менеджер ресурсов в наибольшей степени отвечал нашим потребностям. Это и деплой без даунтайма, и широкий набор инструментов реализации CI на Kubernetes, включая Helm. Кроме того, наша команда обладала достаточной экспертизой по реализации CI-пайплайнов на K8s. Поэтому выбор был очевиден.

Как организована модель управления Spark-приложениями на базе Kubernetes?




Клиент запускает spark-submit на K8s. Cоздаётся pod драйвера приложения. Kubernetes Scheduler привязывает pod к ноде кластера. Затем драйвер отправляет запрос на создание pod’s для запуска экзекьютеров, поды создаются и привязываются к нодам кластера. После чего выполняется стандартный набор операций с последующей конвертацией кода приложения в DAG, декомпозицией на stages, разбивкой на таски и их запуском на экзекьютерах.

Эта модель вполне успешно работает при ручном старте Spark-приложений. Однако, подход c запуском spark-submit вне кластера не устраивал нас с точки зрения реализации CI. Требовалось найти решение, которое позволило бы запускать Spark-джобу (выполнять spark-submit) непосредственно на нодах кластера. И здесь нашим требованиям вполне отвечала модель Kubernetes Operator.

Kubernetes Operator как модель управления жизненным циклом Spark-приложения


Kubernetes Operator — концепция управления statefull-приложениями в Kubernetes, предложенная CoreOS, которая предполагает автоматизацию эксплуатационных задач, таких как деплой приложений, рестарт приложений в случае фэйлов, обновление конфигурации приложений. Одним из ключевых паттернов Kubernetes Operator выступает CRD (CustomResourceDefinitions), который предполагает добавление в кластер K8s кастомных ресурсов, что, в свою очередь, позволяет работать с этими ресурсами как с нативными объектами Kubernetes.

Operator представляет собой демон, который живет в pod’е кластера и реагирует на создание / изменения состояния кастомного ресурса.

Рассмотрим эту концепцию применительно к управлению жизненным циклом Spark-приложений.



Пользователь выполняет команду kubectl apply -f spark-application.yaml, где spark-application.yaml — спецификация Spark-приложения. Operator получает объект Spark-приложения и выполняет spark-submit.

Как мы можем видеть, модель Kubernetes Operator предполагает управление жизненным циклом Spark-приложения непосредственно в кластере Kubernetes, что выступало серьезным аргументом в пользу данной модели в контексте решения наших задач.

В качестве Kubernetes Operator для управления стриминговыми приложениями было принято решение использовать spark-on-k8s-operator. Этот оператор предлагает достаточно удобный API, а также отличается гибкостью в настройке политики рестартов Spark-приложений (что достаточно важно в контексте поддержки стриминговых приложений).

Реализация CI


Для реализации CI стриминга использовался GitLab CI/CD. Деплой Spark-приложений на K8s проводился средствами Helm.

Сам пайплайн предполагает 2 стадии:

  • test — проводится проверка синтаксиса, а также рендеринг Helm-шаблонов;
  • deploy — деплой стриминговых приложений на тестовую (dev) и продуктовую (prod) среды.

Рассмотрим подробнее эти стадии.

На стадии test проводится рендеринг Helm-шаблона Spark-приложений (CRD — SparkApplication) значениями, специфичными для сред.

Ключевыми разделами Helm-шаблона выступают:
  1. spark:
    • version — версия Apache Spark
    • image — используемый Docker-образ
  2. nodeSelector — содержит список (key > value), соответствующих лэйблам подов.
  3. tolerations — указывается список толерейшенов Spark-приложения.
  4. mainClass — класс Spark-приложения
  5. applicationFile — локальный путь, где находится jar-файл Spark-приложения
  6. restartPolicy — политика рестартов Spark-приложения
    • Never — завершенное Spark-приложение не рестартуется
    • Always — завершенное Spark-приложение рестартуется независимо от причины остановки
    • OnFailure — Spark-приложение рестартуется только в случае фэйла
  7. maxSubmissionRetries — максимальное число сабмитов Spark-приложения
  8. driver/executor:
    • cores — количество ядер, выделяемых драйверу/экзекьютору
    • instances (используется только для конфигурации экзекьютеров) — число экзекьюторов
    • memory — объём памяти, выделяемой процессу драйвера/экзекьютора
    • memoryOverhead — объём off-heap memory, выделяемой драйверу/экзекьютору
  9. streams:
    • name — наименование стримингового приложения
    • arguments — аргументы стримингового приложения
  10. sink — путь к датасетам Data Lake на S3

После выполнения рендеринга шаблона приложения деплоятся на тестовую среду dev средствами Helm.

Отработал CI-пайплайн.



После чего запускаем джобу deploy-prod — запуск приложений в продакшен.

Убеждаемся в успешном выполнении джобы.



Как мы можем видеть ниже, приложения запущены, поды — в статусе RUNNING.



Заключение


Портирование Spark Structured Streaming Applications на K8s и последующая реализация CI позволили автоматизировать запуск стримов на доставку данных по новым сущностям. Для подъёма очередного стрима достаточно подготовить Merge Request с описанием конфигурации Spark-приложения в yaml-файле значений и при запуске джобы deploy-prod будет инициирована доставка данных в DWH (Redshift) / Data Lake (S3). Это решение обеспечило автономность команды дата анализа при выполнении задач, связанных с добавлением в хранилище новых сущностей. Кроме того, портирование стриминга на K8s и, в частности, управление Spark-приложениями средствами Kubernetes Operator spark-on-k8s-operator существенно повысили отказоустойчивость стриминга. Но об этом уже в следующей статье.

Комментарии (3)


  1. Yo1
    27.03.2019 10:30

    Стриминг выступает ключевым компонентом BI-платформы ФАСТЕН РУС. Real-time данные используются командой дата анализа для построения оперативных отчетов.

    глупый вопрос — а как с S3 данные оперативно в BI платформы попадают?


    1. sergeysamsonov Автор
      27.03.2019 23:36
      -1

      данные стримятся как в S3 (для обучения нейросетей), так и в DWH (для построения отчетности)


      1. sergeysamsonov Автор
        28.03.2019 17:13

        Соответственно, в Data Lake — история, в DWH — данные по ивентам за последние 3 месяца.