Доброго дня. Меня зовут Иван Клименко, я разработчик потоков обработки данных в компании Аскона. В этом цикле статей я расскажу опыт внедрения инструмента Apache Nifi для формирования DWH. 

Данная статья посвящена первому этапу внедрения Apache NIFI - начальным потокам выгрузки, внедрению инкрементальной загрузки, и описанию существующей архитектуры.

Цели, задачи

Я вхожу в группу обработки данных подразделения “Платформа данных”. Пришёл в команду в начале прошлого года с задачей организовать сбор данных от источников и заполнение DWH с применением Apache NIFI. Так как некая структура уже существовала, то появились обозначения:

  1. Старая архитектура - сбор данных от источников выполняется раз в сутки, ночью, инструмент - SSIS на MsSqlServer (да, в качестве хранилища применяется MSSql, но мы планируем переход на GreenPlum), все инициализируется джобами через Sql Server Agent.

  2. Новая архитектура - сбор данных от источников выполняется раз в сутки с помощью Apache NIFI, заполняются стейджинговые таблицы, потом все по старому - джоб, пакеты, SSIS…

Какие были трудности в старой архитектуре:

  1. Загрузка осуществлялась раз в сутки и в случае большого объема данных SSIS не справлялся с выгрузкой данных от источника, и джоб падал.

  2. С увеличением объема данных и количества выгружаемых сущностей времени на выгрузку и перестройку витрин стало не хватать.

  3. Не выполнялась инкрементальная загрузка.

Задачи новой архитектуры:

  1. Обеспечить загрузку данных по мере их изменения на источнике.

  2. Масштабировать нагрузку на большое количество таблиц.

  3. Обеспечить формирование витринного слоя на основе загружаемого инкремента.

  4. Разделить процессы загрузки и пересчета витрин.

  5. Обеспечить легкую миграцию на GreenPlum. 

Поток ETL

Основным источником данных является ЕРП “Галактика”, СУБД - Oracle. У нашей команды есть доступ к источнику и есть возможность разработки View. Так как в NIFI  есть встроенный механизм сохранения инкрементального поля при запросе (числовое поле или дата-время), решено было для каждой таблицы источника создавать два View - исторический и инкрементальный. Структура View одинаковая, отличие только в том, что для инкрементального представления данные извлекаются из журналов, срок журналирования от 2 недель до 1 месяца. В каждой таблице есть два числовых поля - дата и время изменения записи. Так как NIFI умеет хранить только возрастающие поля, а время является циклично зависимым, решили сделать инкрементальный ключ, объединив в одном поле дату и время изменения записи в числовом представлении.

Был разработан поток загрузки данных, который извлекает данные из таблицы источника, выполняет ряд преобразований и выполняет загрузку в целевую таблицу. Отмечу, что в основном поток разработал мой коллега Фархад, я подключился после решения инфраструктурных задач.

Рис. 1. Общий вид потока. Дает представление о направлении движении данных.
Рис. 1. Общий вид потока. Дает представление о направлении движении данных.

Рассмотрим подробнее, от запуска и до завершения.

На начальном этапе происходит извлечение данных с помощью процессора QueryDatabaseTable.

Рис. 2. Процессор QueryDatabaseTable - порождает поток и выполняет запрос к источнику.
Рис. 2. Процессор QueryDatabaseTable - порождает поток и выполняет запрос к источнику.
Рис. 3. Натсройки  QueryDatabaseTable
Рис. 3. Натсройки QueryDatabaseTable

В настройках указываем соединение, схему и таблицу, перечисляем поля (либо оставляем поле пустым, тогда будут выбраны все поля), и указываем имя поля в настройке Maximum-value Columns. NIFI при запросе вычислит максимум и сохранит его в своем состоянии. При следующей генерации запроса NIFI добавит условие “WHERE  DT_KEY > 1923929292929”, то есть подставит и имя поля и его предыдущее значение. Таким образом реализуется инкрменетальная выгрузка.

Далее необходимо выполнить ряд преобразований. Так как могут поступать данные, которые уже были введены в систему (например, при редактировании), надо понять, произошли ли изменения данных. В целях отслеживания мы рассчитываем хеш каждой записи, и если он будет отличаться, то требуется обновить данные в витринном слое.

Рис. 4. Обновление данных и вычисление хэша
Рис. 4. Обновление данных и вычисление хэша

Сначала идет подготовка данных. Формируем новое поле с помощью Updaterecord, и вычисляем его хэш с помощью стороннего процессора HashColumn (спасибо автору за разработку, исходник в github). При дальнейшей разработки я решил максимально использовать штатные процессоры. В следующей части я покажу, как именно я заменил эту связку.

Рис. 5. Настройки UpdateRecord
Рис. 5. Настройки UpdateRecord

В настройках UpdateAttribute указываем имя нового поля - HASH_DIFF, и правило его формирования - конкатенация значимых полей. В результате формируется новое поле, в котором будет строка, содержащая все поля. И процессор HashColumn вычислит хеш по заданному алгоритму и поместит значение в это же поле.

 Рис. 6. Последовательность действия при преобразовании к CSV
Рис. 6. Последовательность действия при преобразовании к CSV

Сначала идет формирование атрибутов, где атрибуту $filename присваивается текущее имя файла с расширением ".csv". Этот атрибут понадобится при сохранении файла. Далее добавляется мета-информация - в каждую запись вносим время выгрузки. Запись преобразуется к CVS-формату с помощью процессора ConvertRecord. В результате контент из формата "Avro" будет преобразован к "CSV", экранирование происходит в зависимости от настройки сервиса записи.

Рис. 7. Запись файла в сетевую папку и очистка контента
Рис. 7. Запись файла в сетевую папку и очистка контента

Полученный файл сохраняется на сетевую папку на сервере, где крутится целевой MS Sql с помощью PutSmbFile, и выполняем очистку контента. Это необязательный шаг, однако при работе было замечено, что при наличии контента последующий запрос сильно тормозит.

После записи на сервере выполняется запрос для вставки данных в таблицу с помощью компонента ExecuteSQL.

Рис. 8. Последовательность действий для внесения и удаления данных на сервере
Рис. 8. Последовательность действий для внесения и удаления данных на сервере

Сам запрос имеет следующий вид:

BULK INSERT #{tgt.sql.schema}.#{tgt.sql.table.name.customer_orders}
FROM '#{local.folder.Bulk.Insert}${filename}'
WITH(FIRSTROW = 2, FIELDTERMINATOR = '~',ROWTERMINATOR = '0x0a',
     CODEPAGE = 65001, TABLOCK )

Как видно, на сервер передается имя файла для внесения в заданную таблицу, где схема и таблица определена в параметрах группы, а имя файла извлекается из атрибута.

В случае, если произошел сбой, поток отправляется на повтор, и в случае трех неудачных внесений формируется письмо об ошибке.

При удачном внесении выполняется хранимая процедура, удаляющая загруженный файл:

DECLARE 
@deletefile NVARCHAR(MAX) , @cmd NVARCHAR(MAX)

set @deletefile = '${filename}'
set @cmd = 
'xp_cmdshell ''del "#{local.folder.Bulk.Insert}\' + @deletefile  + '"''';
EXEC (@cmd)

Файл можно удалить и средствами NIFI, например через GetSmbFile, но в этом случае контент передается по сети обратно. Можно вызвать скрипт SSH или PowerShell, но это выдача дополнительных разрешений для пользователя NIFI.

Дальнейшие шаги являются служебными - формирование и запись лога, отправка уведомлений и т.д. 

Заключение.

Данный поток позволил нашей команде обкатать инкрементальную загрузку, исключить загрузку данных от источника средствами SSIS, разделить процесс загрузки данных от процесса расчета витрин.

Достоинства:

  1. Простой поток, в котором отражены все этапы ETL - выгрузка, преобразования, загрузка.

  2. Легко масштабировать - сохранив в виде шаблона или положив в Registry, меняя переменные и список полей можно легко добавить новую таблицу.

  3. Легко мониторить - видно, какой поток упал, где ошибка.

Недостатки:

  1. Когда количество таблиц превысило 20, стало неудобно поддерживать это решение.

  2. Все запуски выполнялись примерно в одно и то же время, создавая сильную нагрузку на источник.

  3. При необходимости модификации приходилось вносить изменения во все потоки, в одном и том же месте.

В следующих частях я расскажу о развитии потока выгрузки, его параметризации, как решалась задача информирования о завершении загрузки и уведомления об ошибках, и к чему мы пришли в итоге.

Комментарии (5)


  1. mulia
    10.01.2022 20:49

    имея в руках такой инструмент, как SSIS, к чему все и сводится в конце - зачем все остальное (в этом случае)? все 3 буквы выполнить можно без проблем: и Extract, и Transform, и Load.

    мне кажется, вы просто до конца не изучили все возможности SSIS.

    а насчет этого:

    "set @cmd = 'xp_cmdshell ''del "#{local.folder.Bulk.Insert}\' + @deletefile + '"''';

    EXEC (@cmd) "

    передайте привет тому, кто отвечает за допуски (у вас DBA есть?). что мешает (снова вспоминаем) сделать эту операцию с помощью SSIS?


    1. KlimenkoIv Автор
      10.01.2022 21:52

      Ошибся в интерфейсе.
      Ответил в комментарии


  1. demche
    10.01.2022 21:05
    +1

    Почему выбрали именно NiFi? Какие ещё альтернативы рассматривали?

    Кстати, с такой постановкой задачи ("обеспечить загрузку данных по мере их изменения на источнике") был смысл использовать CDC инструменты.


    1. KlimenkoIv Автор
      10.01.2022 21:59
      +1

      Почему выбрали именно NiFi?

      Решение принимал директор по направлению. Меня взяли на работу с задачей развернуть сервисы и сделать потоки.

      Преимущества NIFI просты - имеет внутренний сервер для запуска процессов, Open Source, стабильный продукт, горизонтальное масштабирование, позволяет хранить выгружаемые данные в себе, обеспечивает потоковую обработку, есть большое количество процессов, и легко написать свой.

      Какие ещё альтернативы рассматривали?

      Рассматривался Spark, Airflow (в плане рассказать о том, как мы его внедрили и для чего).

      Кстати, с такой постановкой задачи ("обеспечить загрузку данных по мере их изменения на источнике") был смысл использовать CDC инструменты

      Бюджет не позволяет использовать GoldenGate. Внедрение Debezium в компании не рассматривалось.


  1. KlimenkoIv Автор
    10.01.2022 21:12
    +1

    Не могу с вами согласится.

    имея в руках такой инструмент, как SSIS, к чему все и сводится в конце - зачем все остальное (в этом случае)? все 3 буквы выполнить можно без проблем: и Extract, и Transform, и Load.

    И это так и было. О возникших трудностях загрузки данных от источников с помощью SSIS я указал. И второй немаловажный момент - готовится миграция на GreenPlum, где нет инструмента, подобного SSIS.

    SSIS мощный инструмент, однако применять его для выгрузки данных нецелесообразно, ресурсы сервера использовались для пересчета витрин, отчетов, и нагружать его выгрузкой с увеличением объема данных стало невозможно. Он просто не успевал все выгрузить и пересчитать.

    передайте привет тому, кто отвечает за допуски (у вас DBA есть?). что мешает (снова вспоминаем) сделать эту операцию с помощью SSIS?

    Да, я в курсе, что это не является безопасным решением. Однако оно решало две задачи - убрать именно тот файл, который залился в стейджинг, и оставить остальные файлы в этой папке.

    После обкатки стейджингового слоя подобное решение было заменено. Об этом я напишу в следующей части.