Сегодня на тематических зарубежных сайтах о Big Data можно встретить упоминание такого относительно нового для экосистемы Hadoop инструмента как Apache NiFi. Это современный open source ETL-инструмент. Распределенная архитектура для быстрой параллельной загрузки и обработки данных, большое количество плагинов для источников и преобразований, версионирование конфигураций – это только часть его преимуществ. При всей своей мощи NiFi остается достаточно простым в использовании.

image

Мы в «Ростелекоме» стремимся развивать работу с Hadoop, так что уже попробовали и оценили преимущества Apache NiFi по сравнению с другими решениями. В этой статье я расскажу, чем нас привлек этот инструмент и как мы его используем.

Предыстория


Не так давно мы столкнулись с выбором решения для загрузки данных из внешних источников в кластер Hadoop. Продолжительное время для решения подобных задач у нас использовался Apache Flume. К Flume в целом не было никаких нареканий, кроме нескольких моментов, которые нас не устраивали.

Первое, что нам, как администраторам, не нравилось – это то, что написание конфига Flume для выполнения очередной тривиальной загрузки нельзя было доверить разработчику или аналитику, не погруженному в тонкости работы этого инструмента. Подключение каждого нового источника требовало обязательного вмешательства со стороны команды администраторов.
Вторым моментом были отказоустойчивость и масштабирование. Для тяжелых загрузок, например, по syslog, нужно было настраивать несколько агентов Flume и ставить перед ними балансировщик. Все это затем нужно было как-то мониторить и восстанавливать в случае сбоя.
В-третьих, Flume не позволял загружать данные из различных СУБД и работать с некоторыми другими протоколами «из коробки». Конечно, на просторах сети можно было найти способы заставить работать Flume с Oracle или с SFTP, но поддержка таких «велосипедов» — занятие совсем не из приятных. Для загрузки данных из того же Oracle приходилось брать на вооружение еще один инструмент — Apache Sqoop.
Откровенно говоря, я по своей натуре являюсь человеком ленивым, и мне совсем не хотелось поддерживать зоопарк решений. А еще не нравилось, что всю эту работу приходится выполнять самому.

Есть, разумеется, достаточно мощные решения на рынке ETL-инструментов, которые умеют работать с Hadoop. К ним можно отнести Informatica, IBM Datastage, SAS и Pentaho Data Integration. Это те, о которых чаще всего можно услышать от коллег по цеху и те, что первыми приходят на ум. К слову, у нас используется IBM DataStage для ETL на решениях класса Data Warehouse. Но так уж исторически сложилось, что использовать DataStage для загрузок в Hadoop наша команда не имела возможности. Опять же, нам не нужна была вся мощь решений такого уровня для выполнения достаточно простых преобразований и загрузок данных. Что нам требовалось, так это решение с хорошей динамикой развития, умеющее работать со множеством протоколов и обладающее удобным и понятным интерфейсом, с которым способен справиться не только администратор, разобравшийся во всех его тонкостях, но и разработчик с аналитиком, которые зачастую и являются для нас заказчиками самих данных.

Как вы могли понять из заголовка, мы решили перечисленные проблемы с помощью Apache NiFi.

Что такое Apache NiFi


Название NiFi происходит от «Niagara Files». Проект в течение восьми лет разрабатывался агентством национальной безопасности США, а в ноябре 2014 года его исходный код был открыт и передан Apache Software Foundation в рамках программы по передаче технологий (NSA Technology Transfer Program).

NiFi — это open source ETL/ELT-инструмент, который умеет работать со множеством систем, причем не только класса Big Data и Data Warehouse. Вот некоторые из них: HDFS, Hive, HBase, Solr, Cassandra, MongoDB, ElastcSearch, Kafka, RabbitMQ, Syslog, HTTPS, SFTP. Ознакомиться с полным списком можно в официальной документации.

Работа с конкретной СУБД реализуется за счет добавление соответствующего JDBC-драйвера. Есть API для написания своего модуля в качестве дополнительного приемника или преобразователя данных. Примеры можно найти здесь и здесь.

Основные возможности


В NiFi используется веб-интерфейс для создания DataFlow. С ним справится и аналитик, который совсем недавно начал работать с Hadoop, и разработчик, и бородатый админ. Последние двое могут взаимодействовать не только с «прямоугольниками и стрелочками», но и с REST API для сбора статистики, мониторинга и управления компонентами DataFlow.

image
Веб-интерфейс управления NiFi

Ниже я покажу несколько примеров DataFlow для выполнения некоторых обыденных операций.

image
Пример загрузки файлов с SFTP-сервера в HDFS

В этом примере процессор «ListSFTP» делает листинг файлов на удаленном сервере. Результат этого листинга используется для параллельной загрузки файлов всеми нодами кластера процессором «FetchSFTP». После этого, каждому файлу добавляются атрибуты, полученные путем парсинга его имени, которые затем используются процессором «PutHDFS» при записи файла в конечную директорию.

image
Пример загрузки данных по syslog в Kafka и HDFS

Здесь с помощью процессора «ListenSyslog» мы получаем входной поток сообщений. После этого каждой группе сообщений добавляются атрибуты о времени их поступления в NiFi и название схемы в Avro Schema Registry. Далее первая ветвь направляется на вход процессору «QueryRecord», который на основе указанной схемы читает данные и выполняет их парсинг с помощью SQL, а затем отправляет их в Kafka. Вторая ветвь направляется процессору «MergeContent», который агрегирует данные в течение 10 минут, после чего отдает их следующему процессору для преобразования в формат Parquet и записи в HDFS.

Вот пример того, как еще можно оформить DataFlow:
image
Загрузка данных по syslog в Kafka и HDFS. Очистка данных в Hive

Теперь о преобразовании данных. NiFi позволяет парсить данные регуляркой, выполнять по ним SQL, фильтровать и добавлять поля, конвертировать один формат данных в другой. Еще в нем есть собственный язык выражений, богатый различными операторами и встроенными функциями. С его помощью можно добавлять переменные и атрибуты к данным, сравнивать и вычислять значения, использовать их в дальнейшем при формировании различных параметров, таких как путь для записи в HDFS или SQL-запрос в Hive. Подробнее можно прочитать тут.

image
Пример использования переменных и функций в процессоре UpdateAttribute

Пользователь может отслеживать полный путь следования данных, наблюдать за изменением их содержимого и атрибутов.


Визуализация цепочки DataFlow

image
Просмотр содержимого и атрибутов данных

Для версионирования DataFlow есть отдельный сервис NiFi Registry. Настроив его, вы получаете возможность управлять изменениями. Можно запушить локальные изменения, откатиться назад или загрузить любую предыдущую версию.

image
Меню Version Control

В NiFi можно управлять доступом к веб-интерфейсу и разделением прав пользователей. На текущий момент поддерживаются следующие механизмы аутентификации:

  • На основе сертификатов
  • На основе имени пользователя и пароля посредством LDAP и Kerberos
  • Через Apache Knox
  • Через OpenID Connect

Одновременное использование сразу нескольких механизмов не поддерживается. Для авторизации пользователей в системе используются FileUserGroupProvider и LdapUserGroupProvider. Подробнее про это можно прочитать здесь.

Как я уже говорил, NiFi умеет работать в режиме кластера. Это обеспечивает отказоустойчивость и дает возможность горизонтально масштабировать нагрузку. Статично зафиксированной мастер-ноды нет. Вместо этого Apache Zookeeper выбирает одну ноду в качестве координатора и одну в качестве primary. Координатор получает от других нод информацию об их состоянии и отвечает за их подключение и отключение от кластера.
Primary-нода служит для запуска изолированных процессоров, которые не должны запускаться на всех нодах одновременно.

image
Работа NiFi в кластере

image
Распределение нагрузки по нодам кластера на примере процессора PutHDFS

Краткое описание архитектуры и компонентов NiFi



Архитектура NiFi-инстанса

NiFi опирается на концепцию «Flow Based Programming» (FBP). Вот основные понятия и компоненты, с которыми сталкивается каждый его пользователь:

FlowFile — сущность, представляющая собой объект с содержимым от нуля и более байт и соответствующих ему атрибутов. Это могут быть как сами данные (например, поток Kafka сообщений), так и результат работы процессора (PutSQL, например), который не содержит данных как таковых, а лишь атрибуты сгенерированные в результате выполнения запроса. Атрибуты представляют собой метаданные FlowFile.

FlowFile Processor — это именно та сущность, которая выполняет основную работу в NiFi. Процессор, как правило, имеет одну или несколько функций по работе с FlowFile: создание, чтение/запись и изменение содержимого, чтение/запись/изменение атрибутов, маршрутизация. Например, процессор «ListenSyslog» принимает данные по syslog-протоколу, на выходе создавая FlowFile’ы с атрибутами syslog.version, syslog.hostname, syslog.sender и другими. Процессор «RouteOnAttribute» читает атрибуты входного FlowFile и принимает решение о его перенаправлении в соответствующее подключение с другим процессором в зависимости от значений атрибутов.

Connection — обеспечивает подключение и передачу FlowFile между различными процессорами и некоторыми другими сущностями NiFi. Connection помещает FlowFile в очередь, после чего передает его далее по цепочке. Можно настроить, как FlowFile’ы выбираются из очереди, их время жизни, максимальное количество и максимальный размер всех объектов в очереди.

Process Group — набор процессоров, их подключений и прочих элементов DataFlow. Представляет собой механизм организации множества компонентов в одну логическую структуру. Позволяет упростить понимание DataFlow. Для получения и отправки данных из Process Groups используются Input/Output Ports. Подробнее об их использовании можно прочитать здесь.

FlowFile Repository — это то место, в котором NiFi хранит всю известную ему информацию о каждом существующем в данный момент FlowFile в системе.

Content Repository — репозиторий, в котором находится содержимое всех FlowFile, т.е. сами передаваемые данные.

Provenance Repository — содержит историю о каждом FlowFile. Каждый раз, когда с FlowFile происходит какое-либо событие (создание, изменение и т.д.), соответствующая информация заносится в этот репозиторий.

Web Server — предоставляет веб-интерфейс и REST API.

Заключение


С помощью NiFi «Ростелеком» смог улучшить механизм доставки данных в Data Lake на Hadoop. В целом, весь процесс стал удобнее и надежнее. Сегодня я могу с уверенностью сказать, что NiFi отлично подходит для выполнения загрузок в Hadoop. Проблем в его эксплуатации у нас не возникает.

К слову, NiFi входит в дистрибутив Hortonworks Data Flow и активно развивается самим Hortonworks. А еще у него есть интересный подпроект Apache MiNiFi, который позволяет собирать данные с различных устройств и интегрировать их в DataFlow внутри NiFi.

Дополнительная информация о NiFi



Пожалуй, на этом все. Спасибо всем за внимание. Пишите в комментариях, если у вас есть вопросы. С удовольствием на них отвечу.

Комментарии (24)


  1. zQQrra
    06.12.2018 12:20

    r3former Когда разбирался с NiFi так и не понял, можно ли там делать что-то типа «проектов»/«разных рабочих областей». Когда в рабочей области становится много различных DataFlow еще и не связанных между собой, навигация между ними неудобна.
    Как вы с этим боретесь? Или я что то не доглядел?


    1. r3former Автор
      06.12.2018 13:06

      Все делается на уровне process groups. Разные проекты выделяются в отдельные PGs, внутри которых подпроекты и задачи в свою очередь разделяются на другие PGs. Разделение прав делается также на уровне PGs.


  1. dmitry_dvm
    06.12.2018 12:57

    Так это из-за этого у вас так чудовищно тормозит ЛК? Вчера не подгружались ни счета на оплату, ни проведенные платежи, ни информация о тарифе.

    Что это за дерьмо, ребята?
    На загрузку этой формы уходит МИНУТА!

    image


    1. r3former Автор
      06.12.2018 13:11

      Мне кажется, Вы не ту статью выбрали для Вашего комментария. NiFi используется в Ростелеком для загрузок данных в Hadoop, а сам Hadoop никоим образом не связан с работой интернет/IPTV/мобильной связи и веб-ресурсов Ростелеком. Hadoop для построения рекомендательных моделей, машинного обучения и всякой разной аналитики. По работе ЛК рекомендую Вам обратиться в техподдержку компании.


      1. dmitry_dvm
        06.12.2018 14:20

        А вы к ним в техподдержку обращались? Сначала прорваться через их сверхразум на входе, потом подождать полчаса, пока кто-нибудь соизволит ответить. У меня есть на что это время потратить. Конечно, мой камент здесь офтоп, но другого способа докричаться до их отделов разработки я не знаю.


    1. tvr
      06.12.2018 13:35

      На загрузку этой формы уходит МИНУТА!


      Ну так это, снежинки, ёлочные украшения, всё для создания новогоднего настроения и атмосферы у клиента, а вам всё не так, всё не нравится.


      1. dmitry_dvm
        06.12.2018 14:35

        Причем снежинка, как и вся статика грузятся быстро. А вот нужная инфа либо минуту, либо вообще не.


  1. barloc
    06.12.2018 20:14

    Статья неплохая. И если бы не знал всех потрохов найфая в работе даже сошло бы :)
    Но:

    Первое, что нам, как администраторам, не нравилось – это то, что написание конфига Flume для выполнения очередной тривиальной загрузки нельзя было доверить разработчику или аналитику, не погруженному в тонкости работы этого инструмента.

    Это все сохраняется и для найфая. Вернее как, бестпрактис и стандартные шаблоны выстраданы квалифицированной командой дев+опс, но никак не первым встречным, который в первый раз увидел найфай. Он просто уронит сервер, а вместе с ним и все остальные таски на нем. Ведь у найфай нет изоляции заданий, все крутится вместе.
    Вторым моментом были отказоустойчивость и масштабирование. Для тяжелых загрузок, например, по syslog, нужно было настраивать несколько агентов Flume и ставить перед ними балансировщик. Все это затем нужно было как-то мониторить и восстанавливать в случае сбоя.

    Ситуация с балансировщиком остается и в случае найфая. Если у вас эндпойнт внутри кластера, то кто же будет клиентов перенаправлять на него в случае падения/обслуживания ноды?
    И масштабирование у найфая весьма специфичное, только через РПГ, которые не работают внутри ПГ.
    В-третьих, Flume не позволял загружать данные из различных СУБД и работать с некоторыми другими протоколами «из коробки». Конечно, на просторах сети можно было найти способы заставить работать Flume с Oracle или с SFTP, но поддержка таких «велосипедов» — занятие совсем не из приятных. Для загрузки данных из того же Oracle приходилось брать на вооружение еще один инструмент — Apache Sqoop.

    Процессоров куча, а в итоге нет-нет да и предется расчехлять скрипты на груви. И не дай бог влезть в житон, более багованной штуки еще не встречал.

    А так приятно видеть, что найфай набирает обороты. Не так активно конечно, как эйрфлоу, но все же. Особенно отрадно видеть сетевых ребят в первом ряду (есть у вас коллеги с ним).

    Ну и добро пожаловать в чатик в телеге: @nifiusers


    1. r3former Автор
      06.12.2018 20:43

      Спасибо за комментарий! Приятно, что нашлись те, с кем можно поделиться опытом. Приглашение в тг-канал принял, уже там)

      Немного поспорю с Вами:

      Это все сохраняется и для найфая. Вернее как, бестпрактис и стандартные шаблоны выстраданы квалифицированной командой дев+опс, но никак не первым встречным, который в первый раз увидел найфай. Он просто уронит сервер, а вместе с ним и все остальные таски на нем. Ведь у найфай нет изоляции заданий, все крутится вместе

      Согласен, но создать темплейты в nifi на все случаи жизни и описать бестпрактисы на вики, чтобы «новичок» смог сделать простые загрузки самостоятельно — это куда проще, чем объяснять flume/sqoop, линукс и написание скриптов на bash/python. Так то и про промышленные ETL-системы можно сказать, что бестпрактисы сложных задач выстраданы и человек «с улицы» все поломает.

      Ситуация с балансировщиком остается и в случае найфая. Если у вас эндпойнт внутри кластера, то кто же будет клиентов перенаправлять на него в случае падения/обслуживания ноды?
      И масштабирование у найфая весьма специфичное, только через РПГ, которые не работают внутри ПГ.

      1. Если процессор предоставляет именно эндпоинт, то балансировщик нужен как и в случае flume. Если мы говорим про загрузку данных из kafka/hdfs/чего угодно, то все это как правило масштабируется в nifi.
      2. РПГ уже не нужен для балансировки для случая, когда трафик идет с primary node. Это пофиксили в версии 1.8 и теперь балансировка работает на уровне connection, пруф. Про то, что РПГ не работают внутри ПГ — не очень понял. У меня в ПГ работает РПГ сейчас, что не так я делаю?


      1. barloc
        07.12.2018 08:09

        Согласен, но создать темплейты в nifi на все случаи жизни и описать бестпрактисы на вики, чтобы «новичок» смог сделать простые загрузки самостоятельно — это куда проще, чем объяснять flume/sqoop, линукс и написание скриптов на bash/python.

        И тут мы приходим к Apache Kylo :)
        Многое, конечно, зависит от. Я наблюдал как опытный девопс делал первый етл на Nifi и получилось достаточно плохо. Оно работало, но без погружения именно в тонкости nifi — работало плохо и завешивало все серверы кластера.
        И все равно внутри обмазано житонами, кафками и всякими секьюрными штуками, чтобы разобраться в которых нужна опсовская практика :)
        Если мы говорим про загрузку данных из kafka/hdfs/чего угодно, то все это как правило масштабируется в nifi.

        Не понял слова масштабируется в этом контексте. У меня опыт версии 1.2 — но масштабирования там особо нет, или все на примари, или РПГ.
        РПГ уже не нужен для балансировки для случая, когда трафик идет с primary node. Это пофиксили в версии 1.8 и теперь балансировка работает на уровне connection, пруф.

        А вот это круто. Как раз готовимся к миграции на 1.8. Значимый бонус, если это так.


        1. r3former Автор
          07.12.2018 10:07

          Не понял слова масштабируется в этом контексте. У меня опыт версии 1.2 — но масштабирования там особо нет, или все на примари, или РПГ.

          Kafka всеми нодами забирается без проблем, там же консамер группы. А hdfs/sftp, если говорить про чтение, с primary-ноды, потом через РПГ все разлетается на другие ноды. Если про запись говорить, то это из коробки работает, считай что пишется в несколько потоков в дестинейшн.


          1. barloc
            07.12.2018 11:06

            Kafka всеми нодами забирается без проблем, там же консамер группы. А hdfs/sftp, если говорить про чтение, с primary-ноды, потом через РПГ все разлетается на другие ноды. Если про запись говорить, то это из коробки работает, считай что пишется в несколько потоков в дестинейшн.

            Понятно, я то надеялся какой-то интересный механизм впилили, а все осталось примерно по разному :)


        1. r3former Автор
          07.12.2018 10:47

          За Kylo отдельное спасибо. Интересный проект и совсем молодой в этой индустрии. Я так понял, что он работает поверх NiFi и включает его в себя.


          1. barloc
            07.12.2018 11:08

            Нашлепка поверх найфая, которую можно отдать пользователям и в которую ты забиваешь как раз свои шаблоны и бестпрактис.
            То есть позволяет ее дать малоопытным пользователям или ДС, которые сами смогут строить свои флоу. И при этом будет не так страшно, как на продовом кластере найфая.
            В чатике дата инженеров есть Антон из Терадаты, у него хороший опыт с kylo.


            1. Wayfarer15
              07.12.2018 19:52

              «чатик дата инженеров» — это где, в телеграмме?


              1. r3former Автор
                07.12.2018 20:01

                Да, этот: @hadoopusers


  1. Wayfarer15
    06.12.2018 20:59

    И ни слова о том, что процессоры для NiFi можно самим написать. Правда документалки по этому делу маловато. Т.е. она как бы есть, но когда начинаешь реально делать, то оказывается, что многие моменты просто не описаны. Например, нужен ли AtomicReference для объектов и как правильно его сделать, чтобы все копии процесса по кластерам использовали только один объект, а не создавали его заново и т.п.


    1. r3former Автор
      06.12.2018 21:32

      В статье об этом есть информация:

      Работа с конкретной СУБД реализуется за счет добавление соответствующего JDBC-драйвера. Есть API для написания своего модуля в качестве дополнительного приемника или преобразователя данных. Примеры можно найти здесь и здесь.

      Предположу, что место для этого абзаца получилось не самым лучшим, и Вы не заметили. Подробно эту тему я не раскрывал, так как это все же краткий обзор. Информации местами действительно не хватает, но многие вещи описаны на портале Hortonworks и в mailing lists.


      1. Wayfarer15
        06.12.2018 21:37

        Я видел эти «здесь и здесь». Для очень простого процессора это пойдёт, для сложного прихдится копаться в документации, но она, как я уже упомянул, оставляет желать лучшего по некоторым специфическим вопросам.

        PS. У меня процессоры для HL7 и FHIR — конвертация из одного формата в другой, валидация, динамические аттрибуты и прочее подобное.


    1. barloc
      07.12.2018 08:12

      Ради интереса подскажтие, большой выигрыш в скорости по сравнению с скриптованием внутри ExecuteScript скажем на груви?
      С документацией полный швах, по апи скриптов раньше было 3 заметки на сайте хортона и все.


  1. sshikov
    06.12.2018 21:46

    >мне совсем не хотелось поддерживать зоопарк решений
    Хм. Не вижу в sqoop ничего сложного. Если у вас много баз данных — то это вполне себе несложное решение (со своими ограничениями, разумеется).


    1. r3former Автор
      06.12.2018 21:51

      Речь не про сложность sqoop, а про совокупность решений. NiFi заменяет flume + sqoop + скрипты. Ведь куда приятнее работать с однородной инфраструктурой, а не когда у Вас на каждый чих отдельное решение.


      1. sshikov
        06.12.2018 21:55

        Да это понятно, но это все все равно до определенного момента. Нам уж как информатику хвалили, как универсальное решение, но постепенно народ нахлебался, и понял, что как только ты выходишь за пределы, которые авторы предусмотрели — так практически сразу и ку-ку. В этом смысле более узкоспециализированные инструменты были и будут лучше, и собрать решение из них можно более подходящее для себя (хотя возможно и большими усилиями).

        P.S. Ни в коем случае не имел в виду агитировать за Sqoop. Он сам не без недостатков.


        1. r3former Автор
          06.12.2018 22:15

          Вы не первый, от кого слышу такой отзыв про informatica) С NiFi у нас хронология несколько иная. Сначала долго тестировали и игрались, а уже потом начали переносить все продакшн-флоу на него. И ожидания от NiFi совершенно не те были, что от informatica. Для нас NiFi — это больше EL, чем ETL. Поэтому пока довольны.