Привет! Меня зовут Владимир Ходак, я работаю инженером данных в компании "ДЮК Технологии". В статье расскажу о практических аспектах использования Apache NiFi, опишу преимущества и проблемы, с которыми я столкнулся.

На Хабре есть статьи, которые подробно описывают работу этого инструмента. Я хотел бы выделить две из них:

Для наглядности я собрал "песочницу" в Docker, в которой представлены упрощенные примеры пайплайнов, аналогичные тем, которые были использованы в реальном проекте. Вы можете найти ее по ссылке: GitHub репозиторий.

Подсказка: Чтобы развернуть песочницу, необходимо установить Docker и скачать весь репозиторий целиком. Затем в командной строке, находясь в папке с скопированным репозиторием, выполните команду "docker-compose up". В папке "GLOBAL_SHARE/NiFi/conf/" содержится конфигурация Apache NiFi со всеми настройками, которая будет применена к образу в Docker.

Описание задачи

Изначально заказчик использовал BI-платформу Qlik Sense, однако для обеспечения независимости от внешних факторов принял решение перейти на Apache Superset. У заказчика имелись специалисты, способные разрабатывать отчеты в Apache Superset. 

Однако отсутствовала собственная команда разработчиков для интеграции данных. Было необходимо найти гибкое решение с открытым исходным кодом, обеспечивающее возможность масштабирования. После анализа всех факторов для реализации проекта был выбран Apache NiFi.

Помимо вышеперечисленного, проект имел ряд особенностей:

  • Большое количество источников: Существовало множество источников данных, требующих интеграции.

  • Ограниченный доступ к источникам: Некоторые источники располагались на стороне партнеров и имели ограниченный доступ.

  • Низкая отказоустойчивость источников: Некоторые источники данных имели низкую отказоустойчивость, что могло привести к проблемам при загрузке.

  • Необходимость потоковой загрузки данных: Требовалась возможность загрузки данных в реальном времени для оперативного анализа.

  • Наличие источников большого объема: Некоторые источники данных генерировали большие объемы информации, что требовало эффективной обработки.

Подготовка данных для отчетов осуществлялась по следующей логике:

  1. Apache NiFi загружал данные из множества различных источников в хранилище GreenPlum.

  2. Обработка данных выполнялась в GreenPlum и оркестрировалась через Apache Airflow.

  3. Apache Superset взаимодействовал с GreenPlum и визуализировал подготовленные витрины данных.

Может возникнуть вопрос, почему на проекте использовались два ETL инструмента и почему нельзя было ограничиться только Apache Airflow. Это связано с уникальными особенностями Apache NiFi.

Особенности Apache NiFi

Apache NiFi - это мощный конструктор для создания пайплайнов загрузки данных, обладающий следующими особенностями:

  • Повторное использование пайплайнов: Инструмент позволяет легко подготовить типовые пайплайны загрузки данных и многократно переиспользовать их с минимальными изменениями, что существенно ускоряет процесс разработки.

  • Группировка пайплайнов в процессные группы: Пайплайны могут быть логически сгруппированы для удобства управления и масштабирования. Этот подход позволяет легко копировать пайплайны с незначительными корректировками через удобный пользовательский интерфейс.

  • Простота использования интерфейса: Для пользователей это означает более дружелюбный опыт работы по сравнению с написанием кода в Apache Airflow. Это позволяет упростить передачу разработок на сопровождение заказчику.

  • Легкая локализация и решение проблем: Даже пользователь с минимальными навыками может быстро определить и решить проблему в пайплайне самостоятельно или передать разработчику, благодаря интуитивно понятному интерфейсу и возможностям быстрой диагностики.

  • Защита от случайных действий пользователя: Инструмент обладает высокой степенью защиты от ошибок пользователей, что снижает риск непреднамеренных сбоев и повреждения данных.

  • Отслеживание изменений и их последствий: С помощью Apache NiFi можно быстро отследить внесенные изменения и их последствия, что обеспечивает прозрачность процесса разработки и управления данными.

Хотя Apache NiFi имеет множество преимуществ, он не идеально подходит для оркестрации запуска функций расчета витрин. Поэтому на проекте было принято решение использовать оба инструмента в соответствующих областях, что позволило получить оптимальные типовые пайплайны в Apache NiFi и Apache Airflow, подробно описанные и легко управляемые как конструкторы.

Пример решения задачи клиента

В собранной мной “песочнице” представлен упрощенный пример процесса ETL из реального проекта. Вот как он работает:

Источником данных является база данных source_db. В схеме source_schema содержатся две таблицы: flights и seats. Необходимо загрузить данные в базу данных dwh_postgres в схему dwh.

На рисунке два пайплайна:

  • Слева находится пайплайн для загрузки всего содержимого таблицы источника.

  • Справа представлен пайплайн для инкрементальной загрузки данных.

Каждый блок представляет собой процессор, выполняющий определенные функции.

Где без штриха отмечена полная загрузка, с штрихом - инкрементальная загрузка.

Для работы с JOLT рекомендую очень удобный сайт JOLT Demo

Архитектурные решения

Одним из основных критериев было быстрое и качественное подключение новых источников с минимальной экспертизой. Для этого были применены следующие решения.

Один источник — одна процессная группа

Чтобы не путаться в источниках, было решено выстроить следующую логику:

  • В головной процессной группе создается группа проекта.

  • В проектной группе для каждого источника создается отдельная дочерняя группа.

  • В группах источников для каждой таблицы создается группа с пайплайном загрузки данных.

Это решение позволяет при подключении нового источника копировать ранее созданные группы целиком с минимальными изменениями. Это упрощает отслеживание показателей процессоров и очередей как с помощью системы мониторинга, так и через пользовательский интерфейс NiFi.

Такой подход представлен в песочнице:

NiFi Flow → головная процессная группа

For_habr → проектная группа

source_db → группа источника

Flights_table и seats_table → группы таблиц

Parameter Contexts

Параметры контекста позволяют задать переменные, которые используются как переменные в процессной группе.

Редактирование параметров ограничено специальными правами. На проекте для всех источников параметры контекста были унифицированы. При необходимости подключить новый источник применялся уже созданный ранее параметр или создавался новый с согласованными именами переменных.

Это позволило копировать процессные группы целиком и значительно снизило ошибки пользователей.

В песочнице приведен пример, когда для источника source_db создается свой параметр контекста, где хранится информация о настройках баз данных и авторизации.

Variables

Для обеспечения универсальности пайплайнов кроме переменных из параметров контекста также используются переменные для групп таблиц.

В зависимости от особенностей могут быть добавлены дополнительные переменные, что позволяет расширить возможности для модификации пайплайнов. Хороший пример использования переменных - процессор "PutFile". Путь для сохранения CSV файла указывается только с использованием переменных.

#{NiFi_share_path}/#{db_source_name}/${source_table_name}

Для каждого источника и таблицы будет создана отдельная папка для хранения временных файлов, что улучшит отслеживаемость потенциальных проблем и объемов загрузки.

Controller services

Контроллеры создаются на уровне группы источника. Это позволяет переиспользовать ранее созданные контроллеры при копировании во всех пайплайнах загрузки таблиц.

Проблемы и решения

Кроме тривиальных советов об установке максимального времени ожидания при запросах к базам данных хочу поделиться решениями для других, менее очевидных проблем.

Зависания процессоров

Источники на проекте работали крайне нестабильно. Одной из проблем было подвисание процессоров без появления ошибок. Эта проблема возникала не регулярно и казалась случайной. Однако выяснилось, что проблема заключалась в SQL сессиях на стороне источника. Решением стало установка проверочного запроса в контроллерах.

Такая настройка потенциально может снизить производительность, однако при тестировании не было обнаружено каких-либо замедлений.

Потерянные CSV файлы с данными

В песочнице данные загружаются через промежуточную выгрузку в CSV файл. При этом имя файла соответствует атрибуту filename. Для подобных операций лучше использовать атрибут UUID, так как он уникален для каждого объекта в NiFi. Например, если будет использован процессор SplitText для разделения содержимого FlowFile на фрагменты, то у всех файлов после деления будет одинаковый атрибут filename. Это может привести к проблемам при сохранении, загрузке и удалении файлов.

Промежуточные запросы к БД

В процессоре 1 с типом ExecuteSQLRecord есть возможность настроить предварительный запрос, основной запрос и последующий запрос. Однако эта настройка не всегда оказывается оптимальной, так как запросы выполняются последовательно, и при этом ошибки в процессоре иногда не фиксируются. В связи с этим было принято решение использовать только поле с основным запросом и, при необходимости, указывать запросы последовательно через точку с запятой.

Ошибочные очереди

Рекомендую всегда создавать очереди для ошибок. Это поможет избежать потери данных при возникновении ошибок и обнаружить проблемы через систему мониторинга или пользовательский интерфейс NiFi.

Можно комбинировать ошибочные очереди с процессом добавления задержки. Например, после возникновения ошибки в процессоре PutFile FlowFile может быть направлен в процессор UpdateAttribute, где устанавливается расписание запуска каждые 1-5 секунд и добавляется атрибут с датой ошибки. Таким образом, при возникновении ошибок FlowFile будет отправляться на переразбор с задержкой, что снизит нагрузку.

Однако у этого метода есть серьезный недостаток. Если очередь ошибок из процессора PutFile переполнится, даже после решения проблемы процессор не будет обрабатывать новые FlowFile, поскольку при возникновении ошибок он не сможет переместить их в ошибочную очередь. Можно увеличить лимит очереди, чтобы избежать переполнения, но это может потребовать ручной настройки и могут возникнуть проблемы с масштабированием.

Параметры запуска процессоров после восстановления

Было принято решение изменить настройку по умолчанию, чтобы в случае падения NiFi всем процессорам устанавливался статус Stop, вместо сохранения последнего статуса до падения. Это решение было принято после нескольких ситуаций, когда пользователи не указывали условия в SQL запросе, что приводило к нехватке памяти и падению сервиса. После восстановления процессоры с некорректной настройкой снова запускались, что приводило к повторному падению сервиса.

Защищенные поля в переменных при создании из шаблона

В NiFi есть возможность сделать поля в параметрах контекста и переменных защищенными. Поля с паролями в контроллерах всегда защищены по умолчанию.

Значение защищенных полей не сохраняется в шаблон для безопасности.

Поэтому при создании процессной группы из шаблона в защищенных поля будут очищены. Например, если потребуется загрузить из шаблона группу источника, то пароль в контроллерах в виде параметра контекста #{db_source_password} будет очищен. Пользователю придется заново ввести переменную в поле с паролем.

Цветовая дифференциация процессоров

Для удобства пользователей процессоры могут быть выделены цветом

В нашем проекте мы решили выделять цветом процессоры для загрузки и выгрузки данных, что позволяло пользователям быстрее ориентироваться в процессе работы.

Результаты внедрения

Этот случай демонстрирует, как NiFi может стать надежным и долгосрочным инструментом в процессе ETL на стороне заказчика. За более чем год использования не было необходимости в значительных изменениях или модификациях, что свидетельствует о стабильной и эффективной работе платформы.

Благодаря значительному запасу по отказоустойчивости и доступности, NiFi стал ключевым инструментом для решения всех задач по загрузке данных заказчика. Заказчик успешно добавлял новые источники данных в систему, несмотря на возможные сбои в инфраструктуре или ошибки со стороны пользователей. Это свидетельствует о гибкости и надежности NiFi, который успешно справлялся с задачами загрузки и обработки данных, обеспечивая стабильную работу всей системы.

Если заказчику требовалась разработка новых пайплайнов для решения новых задач, то он передавал конкретные требования на разработку. Новые типовые пайплайны интегрировались в уже существующую систему и позволяли значительно расширить уже имеющуюся функциональность

Важным преимуществом является масштабируемость NiFi, которая позволяет уверенно добавлять новые источники данных, не оказывая негативного влияния на скорость загрузки и обработки информации. Это обеспечивает заказчику возможность дальнейшего развития и роста без необходимости в поиске и внедрении альтернативных решений.

P.S. Бонусом добавил контейнер с Airflow, — теперь у вас есть полноценный тестовый стенд для практики и реализации своих идей.

Комментарии (8)


  1. YAKOROLEVAZAMKA
    13.05.2024 08:17
    +1

    Для обработки ошибок есть Retry-процессор с настройкой penalty duration, а так же с настройкой retries_exceeded, после чего надо отправлять flowfile в порт error для анализа ошибки

    Не надо failure возвращать обратно в процессор

    А вообще не увидел зачем для таких задач NiFi нужен, почему просто через Airflow PostgresOperator это не сделать


    1. vkhdk Автор
      13.05.2024 08:17

      Спасибо за комментарий и особенно про Retry-процессор.

      На стороне заказчика не было своих DE, а аналитики и поддержка не были готовы работать с кодом DAGs и настройками соединений Airflow для новых источников.

      На практике Nifi стал намного дружелюбнее для пользователей. Подробнее описал под заголовком "Особенности Apache NiFi"


      1. YAKOROLEVAZAMKA
        13.05.2024 08:17

        1) Согласен
        2) Согласен
        3) Согласен
        4) Локализация проблемы - да, а вот насчет простоты решения.. Придёт какой-нибудь ответ по API {"error": "400"} или просто сетевая ошибка - тут пользователю надо всё-таки разбираться в инструменте и уметь читать Java-еггоги хотя бы минимально
        5) Защита? У Вас в примере все процессоры стопнуты, а если они (кроме первого) будут включены? Пока не понимаю до конца этого утверждения
        6) Тоже не понимаю - вот выгрузились данные и куда-то сохранились (или произошёл UPDATE), NiFi это же ETL-инструмент, а не мониторинг, Вы предлагаете в Data Provenance смотреть что ли?)

        И ещё заметил что в в примерах нет процессора GenerateFlowFile - это специально так и задумано?


        1. vkhdk Автор
          13.05.2024 08:17

          1. Про локализацию проблем.
            Пример кейса: Упала база приемник или снизилась производительность. Поддержка видит событие в системе мониторинга и может самостоятельно посмотреть на каком участке пайплайна копится очередь. Какой процессор выдает ошибки . Сколько данных перегрузилось за последние 5 минут и т.д. Мы предоставили поддержке алгоритмы диагностики которые позволяют передать проблему конкретной команде (ЦОД\БД источник\БД получатель)

          2. В Nifi есть множество ограничений для пользователя которые спасают от случайных ошибок
            Например:
            - Нельзя удалить очередь пока в ней есть FlowFiles
            - Нельзя удалить коннектор если он где-то используется
            - Нельзя удалить процессор если он связан с другим процессором и т.д.

          3. Допустим задача выгрузить данные из новой таблицы источника PG. Можно поправить запрос и разово запустить процессор. Тут же посмотреть в очереди получившийся CSV\JSON и т.д. Для аналитиков такая возможность оказалась очень полезной. Они смогли самостоятельно корректировать запросы и отслеживать преобразования данных поэтапно. Это оказалось намного прозрачнее и быстрее чем вычитывать логи задач Airflow

            не очень понял как Вы собираетесь использовать GenerateFlowFile и что хотите генерировать при перегрузе данных из одного источника в другой :)
            В песочнице развернуты две БД. БД источник и БД получатель. Первый процессор в пайплайнах обращается к БД источнику и перегружает данные дальше до БД получатель

            Собственно предлагаю каждому заинтересованному попробовать познакомится с Nifi самостоятельно. Можете обкатать свои идеи в песочнице. Попробовать улучшить мои примеры и т.д.
            Буду рад если кто-нибудь поделится своими шаблонами новых пайплайнов


          1. YAKOROLEVAZAMKA
            13.05.2024 08:17

            GenerateFlowFile создает flowfile который уже бегает по вашим процессорам и собирает данные \ выполняет загрузку данных, пока что для меня загадка как Вы без него работаете, ну да ладно, может у нас подходы к работе в NiFi разные

            4) Если есть процессорная группа под мониторинг - тогда нет вопросов

            5) По идее пользователь не должен трогать работающий NiFi-поток, если он загружает данные на прод и при этом вносит изменения в real-time режиме то это как-то странно

            6) Тут про мониторинг был пункт, но я понял про что Вы говорите, посмотреть данные действительно удобно, тут бесспорно :)


            1. vkhdk Автор
              13.05.2024 08:17
              +1

              Не обязательно использовать flowfile как "триггер" для работы с другими процессорами. Первый процессор выполняет SQL запрос по расписанию и генерирует flowfile в котором так же можно хранить как данные так и атрибуты.

              Могу только порекомендовать развернуть у себя песочницу и посмотреть самостоятельно. Возможно это натолкнет Вас на какие-то новые идеи :)
              Кроме самого Nifi в тестовом стенде есть базы данных PG, файловая шара и прикручен Airflow. Все должно работать "из коробки". Если вдруг обнаружите проблемы с тестовым стендом, то постараюсь оперативно исправить их


              1. YAKOROLEVAZAMKA
                13.05.2024 08:17
                +1

                К сожалению у меня регламент по работе с NiFi, спасибо за объяснение про ваш подход, но у меня такой не покатит :)

                Развернуть мне пока что негде, добавлю в закладки, не удивляйтесь если через полгода произойдёт "реанимация" треда)


  1. GeorgeNordic
    13.05.2024 08:17

    А, в целом, как ощущения от суперсета после Qlik? А то отзывы крайне противоречивые. И почему чистый суперсет, а не отечественный BI на суперсете? Их даже несколько..