Автор статьи: Андрей Поляков

Старший разработчик в Unlimint

Хабр, добрый день! Что первое приходит в голову, когда вы слышите “ETL”? Скорее всего airflow?

По сути airflow - это стандарт де-факто в мире обработки и трансформации данных. Но в случае если в разработка ведется на java, то тогда придется либо переучивать разработчиков на python, либо нанимать еще python разработчиков. В большинстве случаев хочется вносить минимум изменений в процесс разработки и тех. стек. Решение есть! Фреймворк Spring Cloud предоставляет DataFlow - фреймворк для организации ETL в spring среде.

Архитектура

Фреймворк DataFlow состоит из двух основных компонентов:

  • Data Flow Server

  • Skipper Server  (шкипер? ????)

Высокоуровневая архитектура показана на схеме ниже:

 

Основная точка входа для взаимодействия с DataFlow - это REST API сервиса Data Flow Server. Web Dashboard и Data Flow Shell также взаимодействуют с Data Flow Server. Система может работать на различных платформах: Cloud Foundry, Kubernetes или на вашем локальном компьютере.

Обратите внимание, что и Data Flow Server, и Skipper Server хранят данные (и свое внутреннее состояние) в своей собственной РСУБД. Можно подключить одну физическую СУБД к обоим сервисам, так как каждый из них создает свой собственный namespace/schema внутри СУБД. По умолчанию каждый из сервисов использует встроенную базу данных H2.

Давайте познакомимся поближе с Data Flow Server. Он отвечает практически за все ????

  • парсинг задач (job-ов) для стриминга и батчинга на основе своего собственного DSL (о нем чуть позже);

  • валидация, сохранение описаний задач стриминга и батчинга;

  • регистрация артефактов (таких как jar файлы и докер образы), которые испльзуются в шагах пайплайнов;

  • деплой задач;

  • планирование работы задач;

  • получение истории выполнения стриминговых и батч-задач;

  • конфигурирование свойств задач (inputs/outputs, начальное количество инстансов, разделение данных и т.д.);

  • делегирование деплоя стриминговых задач сервису Skipper;

  • аудит (таких событий как создание стрима, деплой и уничтожение батч-задач);

  • и многое друге.

Впечатляющий список, не правда ли?

Другой компонент системы - Skipper Server отвечает в основном за выполнение стриминговых задач.

  • Развертывание потоков на одной или нескольких платформах.

  • Обновление и откат потоков на одной или нескольких платформах с использованием стратегии синего/зеленого деплоя.

  • Хранение истории файла манифеста каждого потока (который представляет окончательное описание того, какие приложения были развернуты).

Строим ETL

Обычно ETL пайплайны в SCDF (Spring Cloud DataFlow) состоят из 3-х компонентов:

  • Source - извлечение данных;

  • Processor - преобразование данных;

  • Sink - загрузка.

Давайте рассмотрим пример ETL-пайплайна:

  1. Данные получаем по HTTP (source).

  2. Преобразуем и насыщаем данные (processor).

    1. Сохраняем данные в лог (sink).

    2. Проведение анализа данных (processor).

  3. Сохраняем данные в файл  (sink).

Для передачи данных между блоками ETL‑пайплайна используется так называемый messaging middleware — промежуточный слой брокера сообщений. Spring Cloud Data Flow поддерживает два брокера сообщений в качестве messaging middleware — Apache Kafka and RabbitMQ.

Для запуска Spring Cloud Data Flow удобнее всего использовать docker‑compose (хотя это далеко не единственный способ!). Обратимся к официальной документации проекта SCDF и получим ссылки для скачивания файлов docker-compose:

curl https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow/main/src/docker-compose/docker-compose.yml -o docker-compose.yml
curl https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow/main/src/docker-compose/docker-compose-<broker>.yml -o docker-compose-<broker>.yml
curl https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow/main/src/docker-compose/docker-compose-<database>.yml -o docker-compose-<database>.yml

Здесь:

<broker> — это брокер сообщений (либо kafka, либо rabbitmq).

<database> — база данных, которая будет использоваться Data Flow Server и Skipper Server (может быть postgres, mariadb или mysql).

Запускаем скачанные docker-compose, перед запуском необходимо установить через переменные окружения версии сервисов SCDF, которые будут использоваться. Например:

set DATAFLOW_VERSION=2.10.1&
set SKIPPER_VERSION=2.9.1

После того как сервисы в docker‑compose запустились, они будут доступны по следующим портам:

Host ports

Container ports

Description

9393

9393

На этом порту работает Data Flow server. Можно открыть дашборд по адресу http://localhost:9393/dashboard  или взаимодействовать с  REST API по адрес http://localhost:9393

7577

7577

На этом порту работает Skipper server. Можно повзаимодействовать с Skipper REST API по адресу http://localhost:7577/api

Важное примечание:

Сейчас Spring Cloud Data Flow по умолчанию использует Java 8 для запуска приложений. Если вы хотите запускать приложения, собранные с Java 17, нужно установить значение переменной окружения BP_JVM_VERSION:

export BP_JVM_VERSION=-jdk17

Также вам скорее всего понадобится возможность отладки приложений и пайплайнов. К сожалению, это не такая простая задача, но в целом очень подробно описана в документации: Customizing Docker Compose. Также там же описывается, как добавить zipkin для распределенной трассировки ваших пайплайнов.

Давайте начнем знакомство с веб‑ui системы SCDF.

Здесь мы видим несколько разделов:

  • приложения

  • потоковые пайплайны

  • задачи (tasks)

  • управление и аудит

Самая первая вкладка — приложения. Здесь мы можем загружать «приложения» — составные блоки нашего будущего ETL‑пайплайна.

Нажмем на кнопку «добавить приложение». Открывается окно с выбором способа добавления приложения:

В самом простом случае нас интересует вкладка «зарегистрировать одно или несколько приложений». Указываем название приложения (скорее всего будет совпадать с названием шага в ETL‑пайплайне, которому оно соответствует), выбираем тип приложения и URI. Обратите внимание, что также нужно указать Metadata URI с мета‑данными вашего приложения.

Добавили приложение? Ок, двигаемся дальше и приступаем к созданию пайплайна.

Для этого нужно перейти в раздел “Потоки” и нажимаем на кнопку “создать потоки”.

Есть два способа описать ETL пайплайн:

  • в визуальном редакторе на форме;

  • с помощью DSL‑языка.

Неважно, какой из способов вы выберете, второй подход будет автоматически добавлен тоже.

Например:

Поток был создан с помощью визуального редактора и при этом автоматически продублирован в виде DSL:

STREAM-1=http | header-enricher | log
:STREAM-1.header-enricher > transform | groovy | file

Нажимаем “создать потоки” и поток отобразится в таблице.

Обратите внимание: в таблице отобразились два отдельных потока. При этом конвейер данных у нас один (т.е. оба потока связаны друг с другом и данные из одного передаются в другой):

Дальше можно запускать сам процесс!

Предположим, что поток данных пошел и наш поток может начать их принимать и обрабатывать.

Переходим к нашему потоку и видим, что он еще не задеплоен.

Первое, что нужно сделать — это задеплоить поток. Нажимаем кнопку «задеплоить» и переходим на форму, на которой необходимо указать параметры развертывания для потока целиком и для каждого из его компонентов:

Отлично! Ваш поток развернут и готов к работе.

Как проверить работу пайплайна?

Для созданного примера достаточно, чтобы по указанному http адресу появились данные, которые наш ETL пайплайн сможет получить и обработать. В конце можем проверить успешность пайплайна, если создался файл с преобразованными данными и создалась запись в логе.

Пару слов про свои собственные приложения

В рассмотренном выше примере мы использовали стандартные приложения, которые уже сделаны за нас и их можно скачать. А что делать, если нужно выполнить логику, для которой не подходят уже существующие приложения? Написать свою.

Давайте посмотрим на примере приложения‑processor (у которого есть ендпойнт для получения входных данных и для проброса данных дальше).

Такое приложение можно уместить в одном файле. Например:

@EnableBinding(Processor.class)
@SpringBootApplication
public class MyProcessorApplication {

 @Autowired
SomeComponent someComponent;

 public static void main(String[] args) {
  SpringApplication.run(MyProcessorApplication.class, args);
 }

 @StreamListener(Processor.INPUT)
 @SendTo(Processor.OUTPUT)
 public Object processMessage(Message<?> message) {
  System.out.println(message);
  MessageHeaders header = message.getHeaders();
   byte[] body = (byte[]) message.getPayload();
   // do some logic and transformations
   someComponent.doSomething(header, body);
  return message;
 }
}

После того, как код приложения написан, можно зарегистрировать его в Data Flow Server через веб‑ui или через shell. Так как через ui мы уже видели интерфейс, приведу пример, как зарегистрировать через shell:

app register --name myProcessor --type processor --uri maven://com.example:my-processor:jar:0.0.1-SNAPSHOT

После этого при создании потоков можно использовать наше только что зарегистрированное приложение наравне с другими.

Подведем итоги: для java‑мира есть довольно удобная, мощная и гибкая альтернатива для airflow и можно ее использовать для построения ETL‑пайплайнов, конвейеров потоковой обработки данных и процессов обработки тяжелых единоразовых задач.

Ну и напоследок приглашаю всех на бесплатный урок, где мы рассмотрим такой принцип проектирования программного обеспечения, как «Соглашения по конфигурации», Поговорим про «Скриптовый Spring» и затронем не менее важную тему «Безопасного программирования».

Комментарии (2)


  1. maxzh83
    00.00.0000 00:00

    Ради интереса, пробовал поиграться с этой штукой. Впечатление она оставила не самое приятное. Если коротко, то те примеры (довольно простые), которые описаны в околомаркетинговых статьях, работают. Но стоит залезть чуть в сторону или написать свое приложение, то начинается странное поведение. Постоянно что-то глючит, не обновляется, работает не с первого раза. Например, бывает, что загружаешь свое приложение через UI и ничего не происходит, висит установка приложения, никаких ошибок нет. В дебрях логов можно найти какую-то невнятную ошибку в kafka. Комьюнити практически отсутствует, хоть это вроде и часть Spring. Короче, поигрался и для себя сделал вывод, что пользоваться всерьез этой штукой пока рано.


    1. Asinrus
      00.00.0000 00:00

      Поддержу автора комментария.

      Также не нашел в SCDF возможность перезапуска отдельной таски, если она упала при пайплайне.

      Изменения, кстати, вносят относительно быстро. На мое issue среагировали и внесли изменения где-то через 3 неделю, но там была просьба добавить пару параметров в конфигурацию деплоймента для k8s