Автор статьи: Андрей Поляков
Старший разработчик в Unlimint
Хабр, добрый день! Что первое приходит в голову, когда вы слышите “ETL”? Скорее всего airflow?
По сути airflow - это стандарт де-факто в мире обработки и трансформации данных. Но в случае если в разработка ведется на java, то тогда придется либо переучивать разработчиков на python, либо нанимать еще python разработчиков. В большинстве случаев хочется вносить минимум изменений в процесс разработки и тех. стек. Решение есть! Фреймворк Spring Cloud предоставляет DataFlow - фреймворк для организации ETL в spring среде.
Архитектура
Фреймворк DataFlow состоит из двух основных компонентов:
Data Flow Server
Skipper Server (шкипер? ????)
Высокоуровневая архитектура показана на схеме ниже:
Основная точка входа для взаимодействия с DataFlow - это REST API сервиса Data Flow Server. Web Dashboard и Data Flow Shell также взаимодействуют с Data Flow Server. Система может работать на различных платформах: Cloud Foundry, Kubernetes или на вашем локальном компьютере.
Обратите внимание, что и Data Flow Server, и Skipper Server хранят данные (и свое внутреннее состояние) в своей собственной РСУБД. Можно подключить одну физическую СУБД к обоим сервисам, так как каждый из них создает свой собственный namespace/schema внутри СУБД. По умолчанию каждый из сервисов использует встроенную базу данных H2.
Давайте познакомимся поближе с Data Flow Server. Он отвечает практически за все ????
парсинг задач (job-ов) для стриминга и батчинга на основе своего собственного DSL (о нем чуть позже);
валидация, сохранение описаний задач стриминга и батчинга;
регистрация артефактов (таких как jar файлы и докер образы), которые испльзуются в шагах пайплайнов;
деплой задач;
планирование работы задач;
получение истории выполнения стриминговых и батч-задач;
конфигурирование свойств задач (inputs/outputs, начальное количество инстансов, разделение данных и т.д.);
делегирование деплоя стриминговых задач сервису Skipper;
аудит (таких событий как создание стрима, деплой и уничтожение батч-задач);
и многое друге.
Впечатляющий список, не правда ли?
Другой компонент системы - Skipper Server отвечает в основном за выполнение стриминговых задач.
Развертывание потоков на одной или нескольких платформах.
Обновление и откат потоков на одной или нескольких платформах с использованием стратегии синего/зеленого деплоя.
Хранение истории файла манифеста каждого потока (который представляет окончательное описание того, какие приложения были развернуты).
Строим ETL
Обычно ETL пайплайны в SCDF (Spring Cloud DataFlow) состоят из 3-х компонентов:
Source - извлечение данных;
Processor - преобразование данных;
Sink - загрузка.
Давайте рассмотрим пример ETL-пайплайна:
Данные получаем по HTTP (source).
Преобразуем и насыщаем данные (processor).
-
Сохраняем данные в лог (sink).
Проведение анализа данных (processor).
Сохраняем данные в файл (sink).
Для передачи данных между блоками ETL‑пайплайна используется так называемый messaging middleware — промежуточный слой брокера сообщений. Spring Cloud Data Flow поддерживает два брокера сообщений в качестве messaging middleware — Apache Kafka and RabbitMQ.
Для запуска Spring Cloud Data Flow удобнее всего использовать docker‑compose (хотя это далеко не единственный способ!). Обратимся к официальной документации проекта SCDF и получим ссылки для скачивания файлов docker-compose
:
curl https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow/main/src/docker-compose/docker-compose.yml -o docker-compose.yml
curl https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow/main/src/docker-compose/docker-compose-<broker>.yml -o docker-compose-<broker>.yml
curl https://raw.githubusercontent.com/spring-cloud/spring-cloud-dataflow/main/src/docker-compose/docker-compose-<database>.yml -o docker-compose-<database>.yml
Здесь:
<broker>
— это брокер сообщений (либо kafka, либо rabbitmq).
<database>
— база данных, которая будет использоваться Data Flow Server и Skipper Server (может быть postgres, mariadb или mysql).
Запускаем скачанные docker-compose
, перед запуском необходимо установить через переменные окружения версии сервисов SCDF, которые будут использоваться. Например:
set DATAFLOW_VERSION=2.10.1&
set SKIPPER_VERSION=2.9.1
После того как сервисы в docker‑compose запустились, они будут доступны по следующим портам:
Host ports |
Container ports |
Description |
9393 |
9393 |
На этом порту работает Data Flow server. Можно открыть дашборд по адресу http://localhost:9393/dashboard или взаимодействовать с REST API по адрес http://localhost:9393 |
7577 |
7577 |
На этом порту работает Skipper server. Можно повзаимодействовать с Skipper REST API по адресу http://localhost:7577/api |
Важное примечание:
Сейчас Spring Cloud Data Flow по умолчанию использует Java 8 для запуска приложений. Если вы хотите запускать приложения, собранные с Java 17, нужно установить значение переменной окружения BP_JVM_VERSION
:
export BP_JVM_VERSION=-jdk17
Также вам скорее всего понадобится возможность отладки приложений и пайплайнов. К сожалению, это не такая простая задача, но в целом очень подробно описана в документации: Customizing Docker Compose. Также там же описывается, как добавить zipkin для распределенной трассировки ваших пайплайнов.
Давайте начнем знакомство с веб‑ui системы SCDF.
Здесь мы видим несколько разделов:
приложения
потоковые пайплайны
задачи (tasks)
управление и аудит
Самая первая вкладка — приложения. Здесь мы можем загружать «приложения» — составные блоки нашего будущего ETL‑пайплайна.
Нажмем на кнопку «добавить приложение». Открывается окно с выбором способа добавления приложения:
В самом простом случае нас интересует вкладка «зарегистрировать одно или несколько приложений». Указываем название приложения (скорее всего будет совпадать с названием шага в ETL‑пайплайне, которому оно соответствует), выбираем тип приложения и URI. Обратите внимание, что также нужно указать Metadata URI с мета‑данными вашего приложения.
Добавили приложение? Ок, двигаемся дальше и приступаем к созданию пайплайна.
Для этого нужно перейти в раздел “Потоки” и нажимаем на кнопку “создать потоки”.
Есть два способа описать ETL пайплайн:
в визуальном редакторе на форме;
с помощью DSL‑языка.
Неважно, какой из способов вы выберете, второй подход будет автоматически добавлен тоже.
Например:
Поток был создан с помощью визуального редактора и при этом автоматически продублирован в виде DSL:
STREAM-1=http | header-enricher | log
:STREAM-1.header-enricher > transform | groovy | file
Нажимаем “создать потоки” и поток отобразится в таблице.
Обратите внимание: в таблице отобразились два отдельных потока. При этом конвейер данных у нас один (т.е. оба потока связаны друг с другом и данные из одного передаются в другой):
Дальше можно запускать сам процесс!
Предположим, что поток данных пошел и наш поток может начать их принимать и обрабатывать.
Переходим к нашему потоку и видим, что он еще не задеплоен.
Первое, что нужно сделать — это задеплоить поток. Нажимаем кнопку «задеплоить» и переходим на форму, на которой необходимо указать параметры развертывания для потока целиком и для каждого из его компонентов:
Отлично! Ваш поток развернут и готов к работе.
Как проверить работу пайплайна?
Для созданного примера достаточно, чтобы по указанному http адресу появились данные, которые наш ETL пайплайн сможет получить и обработать. В конце можем проверить успешность пайплайна, если создался файл с преобразованными данными и создалась запись в логе.
Пару слов про свои собственные приложения
В рассмотренном выше примере мы использовали стандартные приложения, которые уже сделаны за нас и их можно скачать. А что делать, если нужно выполнить логику, для которой не подходят уже существующие приложения? Написать свою.
Давайте посмотрим на примере приложения‑processor (у которого есть ендпойнт для получения входных данных и для проброса данных дальше).
Такое приложение можно уместить в одном файле. Например:
@EnableBinding(Processor.class)
@SpringBootApplication
public class MyProcessorApplication {
@Autowired
SomeComponent someComponent;
public static void main(String[] args) {
SpringApplication.run(MyProcessorApplication.class, args);
}
@StreamListener(Processor.INPUT)
@SendTo(Processor.OUTPUT)
public Object processMessage(Message<?> message) {
System.out.println(message);
MessageHeaders header = message.getHeaders();
byte[] body = (byte[]) message.getPayload();
// do some logic and transformations
someComponent.doSomething(header, body);
return message;
}
}
После того, как код приложения написан, можно зарегистрировать его в Data Flow Server через веб‑ui или через shell. Так как через ui мы уже видели интерфейс, приведу пример, как зарегистрировать через shell:
app register --name myProcessor --type processor --uri maven://com.example:my-processor:jar:0.0.1-SNAPSHOT
После этого при создании потоков можно использовать наше только что зарегистрированное приложение наравне с другими.
Подведем итоги: для java‑мира есть довольно удобная, мощная и гибкая альтернатива для airflow и можно ее использовать для построения ETL‑пайплайнов, конвейеров потоковой обработки данных и процессов обработки тяжелых единоразовых задач.
Ну и напоследок приглашаю всех на бесплатный урок, где мы рассмотрим такой принцип проектирования программного обеспечения, как «Соглашения по конфигурации», Поговорим про «Скриптовый Spring» и затронем не менее важную тему «Безопасного программирования».
maxzh83
Ради интереса, пробовал поиграться с этой штукой. Впечатление она оставила не самое приятное. Если коротко, то те примеры (довольно простые), которые описаны в околомаркетинговых статьях, работают. Но стоит залезть чуть в сторону или написать свое приложение, то начинается странное поведение. Постоянно что-то глючит, не обновляется, работает не с первого раза. Например, бывает, что загружаешь свое приложение через UI и ничего не происходит, висит установка приложения, никаких ошибок нет. В дебрях логов можно найти какую-то невнятную ошибку в kafka. Комьюнити практически отсутствует, хоть это вроде и часть Spring. Короче, поигрался и для себя сделал вывод, что пользоваться всерьез этой штукой пока рано.
Asinrus
Поддержу автора комментария.
Также не нашел в SCDF возможность перезапуска отдельной таски, если она упала при пайплайне.
Изменения, кстати, вносят относительно быстро. На мое issue среагировали и внесли изменения где-то через 3 неделю, но там была просьба добавить пару параметров в конфигурацию деплоймента для k8s