Сегодня концепция витрин данных является стандартом и используется повсеместно. Поэтому даже небольшим компаниям важно определиться с помощью каких инструментов они будут решать проблему оркестрации процессов построения витрин. Какой инструмент в условиях относительно небольшого бюджета позволит достигать поставленных целей? Этот вопрос мы и постараемся раскрыть в статье. Для этого рассмотрим два известных инструмента: Airflow и NiFi, а также постараемся выявить их сильные и слабые стороны.

Под витриной данных мы будем иметь в виду таблицу, которая является совокупностью информации по одному направлению или теме. Потребителями витрин в основном являются аналитические отделы, которые способны делать выводы о дальнейших действиях, составлять отчеты и прогнозировать будущее состояние компании в результате полученных данных. Использование витрин данных позволяет получить быстрый доступ к необходимым пользователю сведениям, а также ускорить анализ данных благодаря относительно небольшому объему. За счет простой структуры витрину можно быстро перестроить в случае изменения модели или задач пользователя.

Исходные данные и описание инструментов

Для начала опишем все вводные. Витрина, которую мы будем создавать, содержит в себе полную информацию о заказах компании и клиентах. Витрина будет находиться в слое DM хранилища на базе PostgreSQL.

Используемые слои данных:

  • ODS — Operation data storage. Данный слой предназначен для накопления данных из систем‑источников. Сущности этого слоя будут использоваться в качестве источников данных в наших etl‑процессах;

  • SDL — stage data layer — промежуточный слой. Здесь мы будем хранить временные таблицы при захвате данных из слоя ODS и выполнении преобразований. Далее на основании данных,сформированных в stage слое, будет производиться расчет итоговой витрины;

  • DM — data mart — слой хранения витрин. Здесь будут находится все наши витрины. Именно к этому слою будут иметь доступ потребители.

Для требуемой витрины потребуется 4 источника из слоя ODS:

  • sales.Store: 4 поля, 701 строка;

  • sales.Customer: 5 полей, 19 820 строк;

  • sales.SalesOrderHeader: 24 поля, 31 465 строк;

  • person.Address: 7 полей, 19 614 строк.

Наибольший из них содержит порядка 30 000 строк. Объем маленький, но для текущей задачи вполне годится. Обновление витрины будет происходить с помощью инкремента. При каждом запуске мы вычисляем максимальное загруженное значение поля Primary Key и получаем все строки больше этого значения. В используемых нами источниках, значения PK генерируются последовательно. История в нашей витрине отсутствует. Схема требуемой витрины:

В таблице SalesOrderHeader содержатся записи с общей информацией о заказах. Во-первых, мы присоединяем таблицу Customer, используя поле customerid, чтобы получить поле storeid. По этому полю мы будем получать информацию о магазинах из таблицы Store. Во-вторых, по полю shiptoaddressid присоединяем источник Address для получения адреса доставки заказа.

Полный код прототипа витрины

with salesorderheader as (
select s.salesorderid
,s.customerid
,s.salespersonid
,s.shiptoaddressid
,cast(s.orderdate as date)
,cast(s.shipdate as date)
,cast(s.duedate as date)
,case
when s.status = 1 then 'In progress'
when s.status = 2 then 'Approved'
when s.status = 3 then 'Backordered'
when s.status = 4 then 'Rejected'
when s.status = 5 then 'Shipped'
when s.status = 6 then 'Canceled'
else 'Unknown'
end as status
,s.subtotal
from sales.salesorderheader s
), customer as (
select customerid
,storeid
from sales.customer
), store as (
select businessentityid
,st."name" as store_name
from sales.store
), address as (
select addressid
,concat(addressline1, ', ', city) as ship_to_address
from person.address
)
select s.salesorderid
,s.customerid
,c.storeid
,s.salespersonid
,s.shiptoaddressid
,a.ship_to_address
,st.store_name
,s.orderdate
,s.shipdate
,s.duedate
,s.status
,s.subtotal
from salesorderheader s
left join customer c on s.customerid = c.customerid
left join store st on c.storeid = st.businessentityid
left join address a on s.shiptoaddressid = a.addressid

Для оценки и сравнения двух подходов мы будем использовать определенный перечень критериев:

  1. Производительность — замеряем время на построение витрины;

  2. Скорость разработки — оцениваем количество трудочасов для реализации одной витрины в условиях, когда логика процесса известна и необходимо адаптировать ее под конкретный случай;

  3. Минимальный уровень знаний — посмотрим, какие знания будут необходимы для успешной работы тем или иным инструментом;

  4. Трудоемкость развертывания и настройки.

Давайте еще раз взглянем на используемые инструменты и поподробнее на них остановимся:

  • Airflow — инструмент, предназначенный для запуска цепочки задач. Данные цепочки реализовываются в виде DAG» — ориентированного ациклического графа, в котором задачи запускаются последовательно, либо параллельно. Сама задача в Airflow представлена в виде Operator. Существуют как стандартные, встроенные операторы, так и дополнительные, которыми можно дополнить встроенный функционал. Помимо этого, присутствует возможность создавать и собственные операторы, используя встроенные возможности Airflow, сторонних библиотек и т. д. Отличительная черта в том, что для создания потока (цепочки задач) нам необходимо описать все шаги в виде операторов и правильно настроить зависимости для каждой задачи. Благодаря тому, что мы сразу имеем дело непосредственно с кодом, у нас есть возможность очень гибко настраивать поведение нашего потока, описывать свои собственные операторы;

  • NiFi — ETL/ELT‑инструмент, который обладает встроенной поддержкой большого количества систем хранения данных. Чтобы создать наш первый поток, знание конкретного языка программирования не потребуется. Его создание происходит в веб‑интерфейсе NiFi. Потоки представлены в виде связки процессоров, которые передают от первого к последнему Flow Files. Flow File — сущность, с помощью которой процессоры обмениваются данными. Помимо самих данных у каждого Flow File есть набор атрибутов, с которым можно взаимодействовать различными способами. Processor — базовый элемент NiFi, который выполняет определенное действие. «Из коробки» нам доступно большое количество процессоров для выполнения подключений к системам, преобразования Flow Files, их атрибутов и многое другое. Благодаря этому разнообразию у нас не возникает необходимости выполнять преобразования вручную. Мы просто настраиваем нужный процессор под конкретную задачу.

 

Airflow

NiFi

Создание пайплайна

Создание потока происходит с применением кода на языке Python

Создание потока происходит с помощью веб-интерфейса, добавления готовых блоков (процессоров) и соединения их соответствующим образом

Принцип работы

Не хранит в себе данные, а только лишь запускает требуемые задачи. Является оркестратором задач: работа по расчету витрины производится на стороне БД, управление логикой этапов расчета витрины производится на стороне Airflow

NiFi хранит на диске объекты Flow File, атрибуты и содержание хранятся в соответствующих папках. Когда объекты находятся в очереди, содержимое Flow File не хранится в памяти, только сами объекты. Как только процессор начинает читать содержимое Flow File, данные попадают в память.

Дополнительные возможности

Собственные операторы создаются с помощью Python

Кастомные процессоры создаются с помощью Java. Есть процессоры, способные запускать внешние программы (например, написанные на python или java)

По какому же пути стоит пойти. Использовать инструмент с возможностью построения потоков при помощи графического интерфейса с разнообразными процессорами, доступными из коробки, но не слишком гибким для решения таких специфических задач как, например,объединение полученных данных, используя SQL Joins? Или выбрать инструмент, который позволит вручную описывать любые процессы, настроить их под конкретные случаи при помощи написания кастомных операторов на языке Python?

Описание процесса в NiFi

Наша задача — обратиться к базе данных, забрать данные из слоя хранения источников, преобразовать их, если необходимо, собрать данные согласно прототипу витрины и вставить получившиеся данные в таргет‑таблицу.

Рис. 2 - Схема работы потока
Рис. 2 - Схема работы потока

Первый шаг: забираем данные

 

Рис. 3 – шаги захвата данных.
Рис. 3 – шаги захвата данных.

Для захвата данных используем процессор QueryDatabaseTable. Данный процессор обращается к определенной базе данных, используя подключение к БД, описанное пользователем. Далее производим захват данных из требуемой таблицы, применяя SQL‑запрос. NiFi получает эти строки, записывает их в Flow File и передает следующему процессору. В настройках мы можем указать максимальное количество строк в одном Flow File. Огромным плюсом этого процессора является то, что мы можем указать поле, по которому будет определяться максимально загруженное значение. Это позволяет сразу решить проблему с реализацией инкрементальной загрузки.

 

Рис. 4 – настройка процессора захвата данных.
Рис. 4 – настройка процессора захвата данных.

Для того, чтобы правильно настроить данный процессор, необходимо:

  • Добавить подключение к нашей БД;

  • Выбрать тип БД: PostgreSQL;

  • Заполнить поле «Custom Query». Можно воспользоваться полями Table Name, где мы указываем таблицу в виде [схема.имя_таблицы], Columns to Return. Но в данном случае запрос слишком громоздкий и проще добавить его как полноценный запрос.

select s.salesorderid 

              ,s.customerid 

              ,s.salespersonid 

              ,s.shiptoaddressid

              ,cast(s.orderdate as date) 

              ,cast(s.shipdate as date)

              ,cast(s.duedate as date)

              ,case 

                        when s.status = 1 then 'In progress'

                        when s.status = 2 then 'Approved'

                        when s.status = 3 then 'Backordered'

                        when s.status = 4 then 'Rejected'

                        when s.status = 5 then 'Shipped'

                        when s.status = 6 then 'Canceled'

                        else 'Unknown'

              end as status

              ,s.subtotal

from sales.salesorderheader s 
  • В поле «Maximum‑value columns» добавить название столбца, по которому считаем максимально загруженное значение. Можно использовать как числовые поля типа id, так и временные поля, имеющие тип timestamp — смотря по какому из этих вариантов возможно точно отследить изменения. Стоит обратить внимание, что в данном случае источники Customer, Store и Address являются справочниками, поэтому в них мы это поле не заполняем;

  • Остальные пункты оставляем по умолчанию. Настройка расписания запуска происходит достаточно просто. У процессора есть отдельная вкладка «Scheduling», в которой мы можем настроить запуск через выбранный временной промежуток, либо использовать cron‑выражение — в зависимости от того, что будет удобнее.

Рис. 4.1 – настройка расписания
Рис. 4.1 – настройка расписания

Работа со справочниками будет немного отличаться. Здесь мы не будем использовать инкрементальную загрузку, а каждый раз будем получать весь справочник целиком.

Перед запуском потока разработчик создает таблицу для каждого источника в схеме stg, которая будет использоваться как промежуточная в процессе сборки витрины. Получается, что у витрины у нас будет своя таблица для каждого источника, в которую будут поступать нужные данные с необходимыми фильтрами и преобразованиями.

Второй шаг: очистка промежуточной таблицы

Так как после выполнения загрузки данных в промежуточных таблицах будут находится ранее добавленные строки, нам необходимо удалить их и только после этого добавить новые. Выполняется процессором PutSQL, который передает БД необходимый запрос для выполнения (см. поле SQL Statement).

Рис. 5 – настройка процессора для очистки промежуточной таблицы.
Рис. 5 – настройка процессора для очистки промежуточной таблицы.

Запуск производится после того, как процессор получает на вход FlowFile с захваченными данными из источника. Выбираем необходимое подключение и заполняем запрос. Теперь, после подготовки промежуточной таблицы, можно записать новые данные. Это действие будем выполнять процессором PutDatabaseRecord.

Рис. 6 – настройка процессора записи данных в промежуточную таблицу.
Рис. 6 – настройка процессора записи данных в промежуточную таблицу.

Для настройки процессора:

  • Предварительно необходимо создать AvroReader — служба, которая позволяет процессору читать данные в Avro формате;

  • Выбираем тип БД, тип операции и указываем нужное подключение;

  • В поле «Table Name» получаем значение из атрибута файла с названием таблицы, используя выражение ${tablename}. Этот атрибут всегда будет присутствовать в атрибутах Flow File, так как он добавляется на этапе захвата оператором QueryDatabase;

  • Остальные параметры оставляем по умолчанию.

Все эти действия мы совершаем для каждого источника. Кроме того, не забываем предварительно создавать промежуточные таблицы.

Третий шаг: сборка витрины и запись данных в таргет-таблицу

 

Рис. 7 – финальные шаги формирования витрины.
Рис. 7 – финальные шаги формирования витрины.

Процессор ExecuteScript используем для обработки FlowFiles, которые поступают из процессоров PutDatabaseRecord. Нам необходимо передать один FlowFile, который будет инициализировать выполнение запроса, содержащего скрипт витрины.

Рис.8 – настройка процессора обработки Flow Files.
Рис.8 – настройка процессора обработки Flow Files.

Указываем язык выполняемого скрипта и в «Script Body» пишем сам скрипт. Суть выполняемого кода: получает количество файлов в очереди, если это число равно 4, как и должно быть (так как 4 источника) — удаляем эти файлы из очереди и отправляем новый FlowFile на выход, в канал SUCCESS. Если файлов меньше 4 в течение 5 минут, Flow File удаляются.

Далее используются те же процессоры, что и ранее. Выполняем скрипт витрины процессором ExecuteSQL и записываем получившиеся данные в таргет‑таблицу.

 

Рис. 9 – формирование витрины и запись в конечную таблицу
Рис. 9 – формирование витрины и запись в конечную таблицу

Так как скрипт достаточно простой, выполняется он одним процессором. Имея дело с более сложными витринами, мы можем разбивать этот процесс на несколько этапов для облегчения нагрузки на БД.

Описание процесса в Airflow

Итак, теперь поговорим про Airflow. Основная задача такая же — получить данные, преобразовать их и записать в таргет‑таблицу витрины. Однако, реализация по логике немного отличается от NiFi. Рассмотрим сразу на примере.

Первый этап — подготовка параметров загрузки источников. Так как в Airflow в отличие от NiFi нет механизма инкрементальной загрузки «из коробки», нам нужно решить этот вопрос самостоятельно. Для этого мы создаем таблицу в базе данных, которая используется Airflow для записи различных метаданных. В этой таблице будут содержаться записи с последними датами изменения строки в источнике. Эта дата является полем, по которому мы будем определять инкремент и подгружать только новые данные. Рассмотрим оператор подготовки параметров загрузки:

class LoadParameters(BaseOperator):

    def __init__(self, sources: str,  database: str = 'dwh', pg_conn: str = 'dwh_pg_conn', **kwargs) -> None:

        super().__init__(**kwargs)

        self.sources = sources

        self.database = database

        self.pg_conn = pg_conn

    

    # Функция захвата последней даты изменения источника

    def get_max_moddate(self, source: str, airflow_pg_conn: str = 'airflow_pg_conn') -> str:

        sql = "select coalesce(max(moddate), '1900-01-01') from public.source_moddate_log where source_name = '{}'"

        hook = PostgresHook(postgres_conn_id=airflow_pg_conn)

        last_moddate = str(hook.get_first(sql.format(source))[0])

        return last_moddate

 

    def get_hash_from_run_id(self, context):

        run_id = context['run_id']

        hash_object = hashlib.md5(f'{run_id}'.encode()).hexdigest()

        return hash_object

 

    def execute(self, context):

        for source_dict in self.sources:

            if  not source_dict['dict_flg']:

                lmd = self.get_max_moddate(source_dict['source_name'])

                context['ti'].xcom_push(key=f'{source_dict["source_name"]}_lmd', value=lmd)

        

        context['ti'].xcom_push(key='hash', value=self.get_hash_from_run_id(context))
  • На вход получаем список источников, которые будут использованы для построения витрины в формате словаря с ключами: имя источника, флаг справочника. Тут мы используем тот же принцип, что и в NiFi, и справочники грузим полностью;

  • Также получаем имя базы данных и название подключения, которое заполняется в Airflow, в поле Connections;

     

    Рис. 10 – настроенные подключения в веб-интерфейсе Airflow
    Рис. 10 – настроенные подключения в веб-интерфейсе Airflow
  • Каждый новый запуск захвата источников будет создавать новую таблицу, в названии которой будет присутствовать хэш-значение. Это значение будет добавляться к именам временных таблиц, чтобы каждый запуск потока имел собственные временные таблицы. Из контекста, который содержит информацию о запущенном даге, получаем ‘run_id’, имеющий вид “manual__2023-07-24T11:23:43.125703+00:00” и генерируем из него хеш. Далее передаем его в XCom, чтобы другие Task’и даги могли иметь к нему доступ;

  • Если источник не является справочником, то мы обращаемся к нашей лог‑таблице и получаем последнюю загруженную дату, которая будет использована как условие при захвате. Если записей для источника нет, устанавливаем значение по умолчанию равное «1900–01–01». Далее, как и хэш, передаем в XCom для дальнейшего использование.

Следующий шаг: захват данных

def execute(self, context):

        hash_value = context['ti'].xcom_pull(

            task_ids='get_load_parameters', key='hash')

        

        if not self.dict_flg:

            lmd = context['ti'].xcom_pull(

                task_ids='get_load_parameters', key=f'{self.source_schema}.{self.source_table}_lmd')

        create_temp_table = f"create table {self.stg_schema}.{self.target_table}_{hash_value} as select {','.join(self.column_list)} " + \

                            f"from {self.source_schema}.{self.source_table} where "

        get_new_lmd_sql = f"select max(cast(modifieddate as date)) from {self.source_schema}.{self.source_table}"

        if self.where_clause and not self.dict_flg:

            create_temp_table += f"{self.where_clause} and modifieddate > '{lmd}'"

        elif not self.where_clause and not self.dict_flg:

            create_temp_table += f"modifieddate > '{lmd}'"

        elif self.where_clause and self.dict_flg:

            create_temp_table += f'{self.where_clause}'

        else:

            create_temp_table += '1 = 1'

 

        hook_dwh = PostgresHook(postgres_conn_id=self.pg_conn)

        hook_dwh.run(create_temp_table)  # Запускаем скрипт создания временной таблицы

        new_lmd = hook_dwh.get_first(get_new_lmd_sql)[0]

 

        # Загружаем новую дату изменения в лог-таблицу, если она != существующей записи

        if not self.dict_flg:

            if str(new_lmd) != str(lmd):

                log_lmd_sql = "insert into public.source_moddate_log (source_name, moddate) values ('{}', '{}')"

                hook_airflow = PostgresHook(postgres_conn_id='airflow_pg_conn')

                hook_airflow.run(log_lmd_sql.format(f'{self.source_schema}.{self.source_table}', new_lmd))

            else:

                print('Новая дата загрузки источника не была добавлена в лог, так как совпадает с предыдущим значением')

Получаем на вход все требуемые значения: такие, как имя источника и его схема, флаг справочника, список полей, которые необходимо вывести, условие where, схему и название промежуточной таблицы.

Сам процесс достаточно прост. Получаем из XCom хэш и последнюю дату изменения, формируем SQL‑запрос и выполняем его в необходимой БД. Далее обращаемся к созданной промежуточной таблице, получаем новое значение даты загрузки и записываем его в лог‑таблицу. В случае, когда новые данные отсутствуют, выводим сообщение в лог Task»и о том, что новые данные отсутствуют и новая дата не была записана.

И, наконец, третий шаг: выполнение скрипта витрины

Сразу стоит сказать, что скрипт достаточно простой, содержит небольшое количество данных и мы можем его выполнить в одно действие. При наличии сложных скриптов, нужно разбивать выполнение на несколько шагов.

class CustomSqlOperator(BaseOperator):

    def __init__(self,

                 sql: str,

                 out_schema: bool,

                 out_table: str,

                 pg_conn: str = 'dwh_pg_conn',

                 **kwargs) -> None:

        super().__init__(**kwargs)

        self.sql = sql

        self.out_schema = out_schema

        self.out_table = out_table

        self.pg_conn = pg_conn

 

    # Оператор выполнения кастомного sql-запроса.

    # Получаем схему и название таргет таблицы, sql запрос, который необходимо выполнить

    # Передаем в запрос create table as с указанием таргета и сам sql запрос

 

    def execute(self, context):

        hash_value = context['ti'].xcom_pull(

            task_ids='get_load_parameters', key='hash')

        create_target = f"create table {self.out_schema}.{self.out_table}_{hash_value} as select * from ({self.sql.format(hash=hash_value)}) t"

 

        hook_dwh = PostgresHook(postgres_conn_id=self.pg_conn)

        hook_dwh.run(create_target)

Для выполнения скрипта используем оператор CustomSqlOperator, в который передаем имя и схему промежуточной таблицы и сам SQL‑запрос. Внутри оператора мы получаем хэш‑значение из XCom и подставляем его в название.

Использование PostgresHook внутри операторов для выполнения запросов удобно тем, что они логируются в лог Task»и. В случае возникновения проблем во время выполнения мы может с легкостью их выявить..

Для настройки расписания, необходимо заполнить аргумент дага «schedule_interval» при его создании. Есть возможность использовать cron‑выражение в виде строки, специальный объект datetime.timedelta или шаблоны. Примеры шаблонов:

Рис. 11 – примеры шаблонов для настройки расписания запусков
Рис. 11 – примеры шаблонов для настройки расписания запусков

Сравнение

 

Airflow

NiFi

Производительность

8 секунд

10 секунд

Скорость разработки

Python, кастомные операторы

Java, Встроенные операторы, подробная документация

Минимальные требования

SQL, Python

SQL

Производительность

После успешного запуска расчета витрин мы можем оценить скорость, с который представленные инструменты справились с поставленной задачей. Количество строк при инициализирующей загрузке в витрину — порядка 30,000.

Даг в Airflow отработал за 8 секунд. Мы можем сравнить время начала первой Task»и и окончания последней. В то же время NiFi показал чуть больше — 10 секунд. Разница можно объяснить тем, что NiFi требуется дополнительное время, чтобы обработать поступающие FlowFile'ы, составить необходимые запросы к БД (в процессорах PutSQL). С другой стороны, Airflow просто обрабатывает поступающие запросы и передает их в работу — ему не требуется выполнять дополнительные манипуляции с результатами захвата или трансформаций. Если учесть, что текущие условия являются наиболее комфортными для запуска расчетов (нет большого количества подключений к БД, одновременно работает только одни процесс) можно с уверенностью сказать, что при равных параметрах кластера Airflow способен показать более оптимальное время.

Стоить учитывать тот момент, что реализация загрузки выполнена двумя разными способами. Сравнивая временные показатели, мы убеждаемся, что использование Flow File для передачи информации в NiFi негативно влияет на производительность. В ситуации, когда процесс в NiFi был бы реализован также через SQL‑команды «create table as …», результаты могли бы отличаться, причем в пользу NiFi.

Скорость разработки

Трудность при разработке на NiFi изначально заключалась в продумывании общей логики работы, так как существуют несколько вариантов исполнения. При разработке Airflow таких серьезных трудностей не возникло: процесс был понятен и основное время ушло на подготовку кастомных операторов.

Для более детальной оценкирассмотрим плюсы и минусы обоих решений:

Airflow.

Плюсы:

  • Встроенные операторы, которые покрывают основные потребности;

  • Основной язык для работы — Python;

  • При знании Python написание кастомных операторов не занимает много времени и зависит от сложности выполняемого действия.

Минусы:

  • Встроенных операторов точно будет недостаточно, поэтому необходимо тратить время на написание собственных операторов.

NiFi.

Плюсы:

  • Большое количество встроенных процессоров, благодаря которым покрываются все основные потребности в функционале;

  • Понятный и интуитивный веб‑интерфейс, с помощью которого формируются пайплайны;

  • Объемная документация;

  • Возможность разработать расширения функционала процессоров на языке groovy.

Минусы:

  • Для решения одной проблемы может существовать несколько решений, что приводит к дополнительному анализу наилучшего выбора;

  • Необходимо тщательно разобраться с системой Flow Files и Connections для бесперебойной работы пайплайнов.

Минимальные требования для подготовки специалиста

Для работы с Airflow нам потребуется: SQL, Python и знания самого Airflow. Для реализации дополнительного функционала или для того, чтобы вникнуть в код существующих операторов, необходим хотя бы минимальный уровень знания Python. Конечно, тут все упирается в сложность решаемых проблем. Изучение самого Airflow — не слишком простая задача. Внутренние механики, такие как XCOM, связка ваших задач внутри DAG»а, заставят покопаться в поисках нужной информации. Зато интерфейс графической части интуитивно понятен и не потребует времени для его изучения.

Для работы с NiFi потребуютсяSQL и NiFi. Тут нам не нужно знать какой‑либо язык программирования, но необходимо изучить многие функции и возможности NiFi.

Выводы

Учитывая полученные результаты в рассмотренном кейсе и возможности обоих инструментов, мы отдали свое предпочтение NiFi. Написание дополнительного кода на языках программирования не потребовалось, все процессоры были доступны «из коробки» и управлялись через графический интерфейс, в то время как в Airflow нам потребовалось затратить время на разработку необходимых операторов на языке Python.

Но при увеличении количества потоков расчета витрин, усложнения их зависимостей, на наш взгляд, более лаконичный вид и удобство в дальнейшей поддержке будет у Airflow. Важно отметить, что Nifi хоть и может использоваться как оркестратор потоков расчета витрин между различными слоями данных одного хранилища, но основное назначение этого инструмента — это потоковая обработка данных между различными типами источников и приемников.

Коротко опишем представленные инструменты:

  • Airflow — удобный, многофункциональный и очень гибкий в решении поставленных задач оркестратор, позволяющий конструировать и управлять различными процессами. Интуитивно понятный графический интерфейс позволяет с легкостью взаимодействовать с вашими дагами, запускать или останавливать их и ставить на расписание. Благодаря тому, что Airflow использует Python, масштабирование возможностей не составляет труда;

  • NiFi — готовый инструмент, в котором нам не нужно придумывать дополнительный функционал, так как широкий спектр задач с легкостью решается набором процессоров, доступных «из коробки». Но стоит сказать, что NiFi очень удобен для связки нескольких систем: для потоковой обработки данных и для их трансформации. Поэтому, хотя мы и можем решить поставленную проблему с помощью NiFi, имеется определенный уровень сложности, после которого данный способ становится неоптимальным в использовании.

Литература

Комментарии (10)


  1. atshaman
    10.10.2023 19:12
    +2

    Слушайте, ну это так плохо, что просто "плохо". Ну ладно общая оверэнженернутость подхода в данном случае, пример учебный, спишем на это. Но блин, вы запускаете 4 своих потока асинхронно, и надеетесь, что они всегда дадут 4 flow-file'а. А если нет? Если трафик обновления в таблицах разный? Пуф! Вы пролюбили данные.

    А если кто-то остановил процессоры после первого инсерта? Пуф, вы пролюбили данные - следующий флоуфайл запустит truncate.

    А если трафик изменений в таблицах действительно высокий? Пуф! Вы пролюбили данные - цепочка обработки не ждет прохождения каждого отдельного flow-file'а от начала и до конца.

    Если БЫ вы использовали запуск по крону - этих проблем бы не было, но чистить временные данные "in general" все равно лучше после того, как завершили запись, а не до.

    Если вы уверены, что на каждый запуск у вас будут данные по всем четырем потокам - зачем вам тут скрипты? Используйте merge с min files = max files = 4 и expiration на очереди.

    Если у вас четыре идентичных потока обработки - не надо их контрол-цэ контрол-вэ - извлекли данные из таблиц - запускайте их в общий поток с параметризацией - под нагрузкой окупится.

    Ну и опять же - обработка ошибок, алертинг... не-еет, "такая цифровая трансформация нам не нужен"(Цы).


  1. Ninil
    10.10.2023 19:12
    +2

    Сравнивать Airflow с NiFi??? Это же инструменты совсем разных классов! Красное с круглыми...


    1. denis-isaev
      10.10.2023 19:12

      cron vs awk: исследуем инструменты для парсинга логов


      1. Ninil
        10.10.2023 19:12

        именно!)


    1. atshaman
      10.10.2023 19:12

      А почему нет-то? И то, и другое - решения класса ETL - то, что подход для решения задач выбран ОЧЕНЬ разный, и, как следствие, "локальные оптимумы" в массиве решаемых проблем разложены сильно по разному - но они определенно пересекаются.

      Есть классы задач, которые одинаково эффективно решаются обоими инструментами (Злые языки говорят, что без них - даже быстрее) - и в эту сторону было бы интересно посравнивать. С моей т.з. тут проблема именно в самом сравнении.


      1. Ninil
        10.10.2023 19:12

        Вот как раз нет. Airflow - это оркестратор в первую очередь. A NiFi - да, ETL - инструмент. Из-за непонимании этого различия у автора по тексту и встречается дичь вида

        ... Airflow в отличие от NiFi нет механизма инкрементальной загрузки «из коробки»..

        Ну а так, в телегу можно запрячь и корову, а шурупы закручивать ножницами. И при этом даже результат приемлемый может получиться. Но статья "Корова или лошадь - кого лучше запрячь в телегу?" наверное все же не для "Сельского Весника", а для "Крокодила".


        1. atshaman
          10.10.2023 19:12

          Ну, в FAQ у них все еще осталось "Airflow was developed as a solution for ETL needs."

          Вот сравнивать NiFi и прости оссспыдя, Camunda'у - таки да, было бы странно (Хотя ETL проект на последней я видел, бррр!) - а тут не вижу прям криминала.


          1. Ninil
            10.10.2023 19:12

            "solution for ETL needs " <> "ETL-tool". А так cron - тоже можно использовать "for ETL needs" :)))
            Разница между хорошим специалистом и просто специалистом именно в понимании сути и нюансов. Никого не хочу обидеть, но прямое сравнение Airflow и NiFi допустимо для вчерашнего вупускника курса "войти в ИТ", но не в блоге уважаемой компании


            1. atshaman
              10.10.2023 19:12

              Ну, определенного рода казуистика уже :). Крон можно использовать "for ETL needs" - но он определенно не developed as solution for :)

              Если что, в документации NiFi аббревиатура "ETL" вообще не упоминается:

              "What is Apache NiFi?

              Put simply, NiFi was built to automate the flow of data between systems."

              Предлагаю зафиксировать common ground:

              1. NiFi и Airflow ДОХРЕНА разные инструменты и выбирать между ними _без учета специфики задач_ - ну прям очень такоЭ.

              2. Вне зависимости от корректности сравнения per se - сама статья - ну, этакоЭ


              1. Ninil
                10.10.2023 19:12

                agreed)