Привет, Хабр!
Потоки данных между системами стабильно увеличиваются, и в обозримом будущем эта тенденция вряд ли изменится, что создает постоянную потребность в инструментах для работы с данными.
Apache NiFi — программный продукт с открытым исходным кодом, написанный на языке Java, предназначенный для автоматизации потоков данных между системами. Главная его задача: организовывать ETL‑процессы. На GitHub у Apache NiFi имеется 5.9 тысяч звезд.
Для тех, кто не знает, что такое Apache NiFi советую прочитать отличную статью.
Моя статья посвящена написанию кастомного процессора для Apache NiFi на Java и требует базовых знаний в области Apache NiFi, опыт программирования на Java и IDE на борту компьютера.
Ну, начнем!
Для тех, кто немного подзабыл, что такое процессор в Apache NiFi
Процессор — это механизм, с помощью которого NiFi предоставляет доступ к FlowFiles, их атрибутам и содержимому. В Apache NiFi процессоры являются основными компоновочными блоками и используются для построения пайплайнов обработки.
В качестве примера я покажу, как создать процессор, выполняющий базовую задачу Word Count на основе существующего атрибута. Разумеется, в реальной практике такая задача вам вряд ли встретится, но в качестве обучающего примера этого более чем достаточно.
Мы будем разрабатывать процессор под Apache NiFi 2.4.0. Это практически самая свежая и стабильная версия на данный момент.
Первым делом стоит создать проект на основе Maven Archetype. В Intellij Idea для этого нужно выбрать New Project в верхней панели инструментов. Откроется окно, в котором в области Generators следует выбрать Maven Archetype.
В нашем случае в поле Name мы записываем название нашего проекта. Оно влияет лишь на имя директории, созданной в IDE.
В качестве JDK выбран Amazon Corretto 21, так как Apache NiFi рекомендует использовать Java 21 для работы с этой версией. В поле Catalog выбираем Maven Central, чтобы увидеть архетип для NiFi. После этого выбираем: org.apache.nifi:nifi-processor-bundle-archetype.
Версию архетипа выбираем в соответствии с версией Apache NiFi — в нашем случае это 2.4.0.
Также в additionalProperties следует обязательно указать artifactBaseName. Иначе там по умолчанию установится значение true, что сломает структуру пакетов. В artifactId мы устанавливаем идентичное значение. При желании можно также поправить package, если вы понимаете, как работает этот параметр. В поле Version изначально указано значение 1.0-SNAPSHOT. Однако при работе со SNAPSHOT‑версией у вас просто не пройдет сборка, потому что это не релизная версия вашего процессора.

Не собирающийся модуль

После этого жмем Create и получаем готовую структуру проекта. Выглядит она следующим образом:

Таким образом, мы получаем два модуля: один для сборки NAR и JAR. Для добавления модуля в NiFi будет использоваться как раз таки NAR.
Откроем основной файл с главный нашим классом, а именно MyProcessor (при желании его можно переименовать, это название влияет на нейминг в UI)
Код шаблонного класса
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ru.kotletkin.dev.processors.demohabrprocessor;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;
import java.util.List;
import java.util.Set;
@Tags({"example"})
@CapabilityDescription("Provide a description")
@SeeAlso({})
@ReadsAttributes({@ReadsAttribute(attribute="", description="")})
@WritesAttributes({@WritesAttribute(attribute="", description="")})
public class MyProcessor extends AbstractProcessor {
public static final PropertyDescriptor MY_PROPERTY = new PropertyDescriptor
.Builder()
.name("My Property")
.displayName("My Property")
.description("Example Property")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("Example success relationship")
.build();
private List<PropertyDescriptor> descriptors;
private Set<Relationship> relationships;
@Override
protected void init(final ProcessorInitializationContext context) {
descriptors = List.of(MY_PROPERTY);
relationships = Set.of(REL_SUCCESS);
}
@Override
public Set<Relationship> getRelationships() {
return this.relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
// TODO implement
session.transfer(flowFile, REL_SUCCESS);
}
}
Видно, что у класса присутствует много аннотаций и методов, которые переопределяют родительский AbstractProcessor. AbstractProcessor реализует интерфейс Processor и предоставляет готовые реализации большинства методов, а также позволяет сосредоточиться на бизнес‑логике, а не на написании boilerplate‑кода.
Про все существующие аннотации и методы в рамках одной статьи очень тяжело уложиться, поэтому про них лучше почитать в Developer Guide. Описание тех аннотаций, которые уже существуют в нашем коде, мы поговорим. Их описание в таблице ниже:
Название аннотации |
Описание аннотации |
@Tags |
Используется для добавление тэгов для поиска и категоризации процессора в UI |
@CapabilityDescription |
Используется для описания процессора и производимого действия им. Помогает понять назначение без чтения документации |
@ReadsAttributes |
Сообщает системе и пользователям, какие атрибуты FlowFile читает процессор |
@WritesAttributes |
Сообщает системе и пользователям какие новые атрибуты FlowFile добавляет процессор |
Для нашего тестового процессора, определим следующие параметры для этих аннотаций:
@Tags({"word", "count", "statistics", "habr"})
@CapabilityDescription("Counts the number of words in a specified FlowFile attribute")
@ReadsAttributes({
@ReadsAttribute(attribute = "habr.wc.target", description = "The attribute that will be analyzed for word count")
})
@WritesAttributes({
@WritesAttribute(attribute = "habr.wc.result", description = "The number of words found in the specified attribute")
})
Мы добавили тэги, описание, указали, что значение целевого атрибута по‑умолчанию для подсчета слов будет habr.wc.target, а результат запишется в атрибут habr.wc.result
Пример отображения значений аннотаций

Далее, рассмотрим настройки процессора. Примером в нашем коде в данном случае является параметр MY_PROPERTY с типом PropertyDescriptor. PropertyDescriptor определяет свойство, которое будет использоваться в Processor. Объект такого типа включает его имя, описание свойства, необязательное значение по умолчанию, логику проверки (валидатор) и указание на то, является ли свойство обязательным для корректной работы Processor. PropertyDescriptors создаются путем создания экземпляра класса PropertyDescriptor.Builder, вызова соответствующих методов для заполнения сведений о свойстве и, наконец, вызова метода build, то есть реализуют классический паттерн «Строитель».
Часть кода из шаблона
public static final PropertyDescriptor MY_PROPERTY = new PropertyDescriptor
.Builder()
.name("My Property")
.displayName("My Property")
.description("Example Property")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();Мы добавим в наш пример три атрибута: исходный атрибут (для перезаписи нашего дефолтного), результирующий, а также разделитель для слов. Примеры описания структуры практически идентичны в базовом шаблоне. Относительно валидаторов - они бывают разные и перечислены в классе StandardValidators.
public static final PropertyDescriptor SOURCE_ATTRIBUTE = new PropertyDescriptor.Builder()
.name("Source Attribute")
.displayName("Source Attribute")
.description("Name of the attribute that contains text for word counting")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("habr.wc.target")
.build();
public static final PropertyDescriptor OUTPUT_ATTRIBUTE = new PropertyDescriptor.Builder()
.name("Output Attribute")
.displayName("Output Attribute")
.description("Name of the attribute to store the word count result")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("habr.wc.result")
.build();
public static final PropertyDescriptor WORD_DELIMITER = new PropertyDescriptor.Builder()
.name("Word Delimiter")
.displayName("Word Delimiter")
.description("Regular expression pattern used to split text into words (default: whitespace characters)")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("\\s+")
.build();
Геттеры getRelationships и getSupportedPropertyDescriptors в нашем примере, как и в большинстве случаев изменять вам не придется.
Однако, в конструкторе стоит поменять список descriptors и relationships, если они отличаются от базовых. В нашем случае это выглядит так:
@Override
protected void init(final ProcessorInitializationContext context) {
descriptors = List.of(SOURCE_ATTRIBUTE, WORD_DELIMITER, OUTPUT_ATTRIBUTE);
relationships = Set.of(REL_SUCCESS, REL_FAILURE);
}
ProcessorInitializationContext в нашем случае не нужен, он предназначен для задач, когда в моменте инициализации процессора требуется выполнить какую‑либо задачу (логирование, досту�� к ControllerServices). Например, вывести идентификатор процессора при инициализации можно следующим образом:
@Override
protected void init(final ProcessorInitializationContext context) {
String processorId = context.getIdentifier();
getLogger().info("Processor ID: {}", processorId);
// Выведет что-то вроде: "Processor ID: a1b2c3d4-1234-5678-90ab-cdef12345678"
}
Метод помеченный аннотацией@OnScheduled используется для реализации логики, которая будет выполняться при запуске процессора.
Теперь перейдем к самому главному, реализацию метода onTrigger, который отвечает за работу, выполняемую с каждым FlowFile.
Сразу же, нас встречает небольшая часть кода, который позволяет прекратить выполнение, если FlowFile не существует. Выглядит это как простейшая защита от NPE:
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
Далее, нам уже необходимо реализовать наш код по подсчету слов.
Объяснять алгоритм по подсчету слов выглядит как не самая интересная часть статьи, поэтому подсчет будет вынесен в отдельный метод кода. Разберем код метода onTrigger:
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
try {
// Получаем значения свойств
final String sourceAttribute = context.getProperty(SOURCE_ATTRIBUTE).getValue();
final String wordDelimiter = context.getProperty(WORD_DELIMITER).getValue();
final String outputAttribute = context.getProperty(OUTPUT_ATTRIBUTE).getValue();
// Получаем значение атрибута, в котором нужно совершить Word Count
final String attributeValue = flowFile.getAttribute(sourceAttribute);
// Простейшие проверки
if (attributeValue == null || attributeValue.trim().isEmpty()) {
getLogger().warn("Attribute '{}' does not exist or is empty for FlowFile {}",
sourceAttribute, flowFile.getId());
// Отправки файла в нужный RelationShip
session.transfer(flowFile, REL_FAILURE);
return;
}
// Подсчитываем слова
final int wordCount = countWords(attributeValue, wordDelimiter);
getLogger().debug("Found {} words in attribute '{}'", wordCount, sourceAttribute);
// Добавляем новый атрибут
final Map<String, String> attributes = new HashMap<>();
attributes.put(outputAttribute, String.valueOf(wordCount));
// Кладем все атрибуты
flowFile = session.putAllAttributes(flowFile, attributes);
// Перенаправляем в Success
session.transfer(flowFile, REL_SUCCESS);
session.getProvenanceReporter().modifyAttributes(flowFile);
getLogger().info("Successfully counted {} words for FlowFile {}", wordCount, flowFile.getId());
} catch (final Exception e) {
getLogger().error("Error processing FlowFile {}: {}", flowFile.getId(), e.getMessage(), e);
session.transfer(flowFile, REL_FAILURE);
}
}
private int countWords(final String text, final String delimiterPattern) {
if (text == null || text.trim().isEmpty()) {
return 0;
}
try {
Pattern pattern;
if (delimiterPattern != null && !delimiterPattern.trim().isEmpty()) {
pattern = Pattern.compile(delimiterPattern);
} else {
pattern = Pattern.compile("\\s+");
}
String[] words = pattern.split(text.trim());
return (int) Arrays.stream(words)
.filter(word -> !word.trim().isEmpty())
.count();
} catch (PatternSyntaxException e) {
getLogger().warn("Invalid regex pattern '{}', using default whitespace pattern", delimiterPattern);
Pattern defaultPattern = Pattern.compile("\\s+");
String[] words = defaultPattern.split(text.trim());
return (int) Arrays.stream(words)
.filter(word -> !word.trim().isEmpty())
.count();
}
}
В самом начале метод принимает два параметра с типами ProcessContext и ProcessSession.
ProcessContext предоставляет доступ к настройкам и конфигурации процессора. Позволяет получать значения свойств, идентификатор процессора или любую другую информацию о нем.
ProcessSession предоставляет методы для манипуляции FlowFiles и управления транзакциями. Позволяет получать FlowFile из входной очереди, создавать новые, читать содержимые, добавлять атрибуты и так далее
Так, в следующем участке кода мы получаем значения параметров, которые определяли ранее:
final String sourceAttribute = context.getProperty(SOURCE_ATTRIBUTE).getValue();
final String wordDelimiter = context.getProperty(WORD_DELIMITER).getValue();
final String outputAttribute = context.getProperty(OUTPUT_ATTRIBUTE).getValue();
В UI значения параметров выглядят так:

Затем, получаем текст, который содержится в нашем атрибуте, название которого содержится в параметре Source Attribute
final String attributeValue = flowFile.getAttribute(sourceAttribute);
Далее, проводим проверки в коде и в случае, если какая‑либо из них не проходит (обычное условие if), отправляем файл в Failure Relation. За это отвечает session.transfer()
// Простейшие проверки
if (attributeValue == null || attributeValue.trim().isEmpty()) {
getLogger().warn("Attribute '{}' does not exist or is empty for FlowFile {}",
sourceAttribute, flowFile.getId());
// Отправки файла в нужный RelationShip
session.transfer(flowFile, REL_FAILURE);
return;
}
В случае, если ошибок, то продолжаем выполнять логику и отправляем FlowFile в REL_SUCCESS:
// Подсчитываем слова
final int wordCount = countWords(attributeValue, wordDelimiter);
getLogger().debug("Found {} words in attribute '{}'", wordCount, sourceAttribute);
// Добавляем новый атрибут
final Map<String, String> attributes = new HashMap<>();
attributes.put(outputAttribute, String.valueOf(wordCount));
// Кладем все атрибуты
flowFile = session.putAllAttributes(flowFile, attributes);
// Перенаправляем в Success
session.transfer(flowFile, REL_SUCCESS);
// Зафиксировать в журнале аудита, что атрибуты этого FlowFile были изменены
session.getProvenanceReporter().modifyAttributes(flowFile);
getLogger().info("Successfully counted {} words for FlowFile {}", wordCount, flowFile.getId());
Таким образом, у нас получается полноценный код для нашего процессора. Подведем итог готового кода.
Результат кода для процессора
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ru.kotletkin.dev.processors.demohabrprocessor;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.*;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import java.util.*;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;
@Tags({"word", "count", "statistics", "habr"})
@CapabilityDescription("Counts the number of words in a specified FlowFile attribute")
@ReadsAttributes({
@ReadsAttribute(attribute = "habr.wc.target", description = "The attribute that will be analyzed for word count")
})
@WritesAttributes({
@WritesAttribute(attribute = "habr.wc.result", description = "The number of words found in the specified attribute")
})
public class MyProcessor extends AbstractProcessor {
public static final PropertyDescriptor SOURCE_ATTRIBUTE = new PropertyDescriptor.Builder()
.name("Source Attribute")
.displayName("Source Attribute")
.description("Name of the attribute that contains text for word counting")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("habr.wc.target")
.build();
public static final PropertyDescriptor OUTPUT_ATTRIBUTE = new PropertyDescriptor.Builder()
.name("Output Attribute")
.displayName("Output Attribute")
.description("Name of the attribute to store the word count result")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("habr.wc.result")
.build();
public static final PropertyDescriptor WORD_DELIMITER = new PropertyDescriptor.Builder()
.name("Word Delimiter")
.displayName("Word Delimiter")
.description("Regular expression pattern used to split text into words (default: whitespace characters)")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.defaultValue("\\s+")
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
.name("success")
.description("FlowFiles with successfully word count")
.build();
public static final Relationship REL_FAILURE = new Relationship.Builder()
.name("failure")
.description("FlowFiles with not successfully word count")
.build();
private List<PropertyDescriptor> descriptors;
private Set<Relationship> relationships;
@Override
protected void init(final ProcessorInitializationContext context) {
descriptors = List.of(SOURCE_ATTRIBUTE, WORD_DELIMITER, OUTPUT_ATTRIBUTE);
relationships = Set.of(REL_SUCCESS, REL_FAILURE);
}
@Override
public Set<Relationship> getRelationships() {
return this.relationships;
}
@Override
public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
return descriptors;
}
@OnScheduled
public void onScheduled(final ProcessContext context) {
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
FlowFile flowFile = session.get();
if (flowFile == null) {
return;
}
try {
// Получаем значения свойств
final String sourceAttribute = context.getProperty(SOURCE_ATTRIBUTE).getValue();
final String wordDelimiter = context.getProperty(WORD_DELIMITER).getValue();
final String outputAttribute = context.getProperty(OUTPUT_ATTRIBUTE).getValue();
// Получаем значение атрибута, в котором нужно совершить Word Count
final String attributeValue = flowFile.getAttribute(sourceAttribute);
// Простейшие проверки
if (attributeValue == null || attributeValue.trim().isEmpty()) {
getLogger().warn("Attribute '{}' does not exist or is empty for FlowFile {}",
sourceAttribute, flowFile.getId());
// Отправки файла в нужный RelationShip
session.transfer(flowFile, REL_FAILURE);
return;
}
// Подсчитываем слова
final int wordCount = countWords(attributeValue, wordDelimiter);
getLogger().debug("Found {} words in attribute '{}'", wordCount, sourceAttribute);
// Добавляем новый атрибут
final Map<String, String> attributes = new HashMap<>();
attributes.put(outputAttribute, String.valueOf(wordCount));
// Кладем все атрибуты
flowFile = session.putAllAttributes(flowFile, attributes);
// Перенаправляем в Success
session.transfer(flowFile, REL_SUCCESS);
// Зафиксировать в журнале аудита, что атрибуты этого FlowFile были изменены
session.getProvenanceReporter().modifyAttributes(flowFile);
getLogger().info("Successfully counted {} words for FlowFile {}", wordCount, flowFile.getId());
} catch (final Exception e) {
getLogger().error("Error processing FlowFile {}: {}", flowFile.getId(), e.getMessage(), e);
session.transfer(flowFile, REL_FAILURE);
}
}
private int countWords(final String text, final String delimiterPattern) {
if (text == null || text.trim().isEmpty()) {
return 0;
}
try {
Pattern pattern;
if (delimiterPattern != null && !delimiterPattern.trim().isEmpty()) {
pattern = Pattern.compile(delimiterPattern);
} else {
pattern = Pattern.compile("\\s+");
}
String[] words = pattern.split(text.trim());
return (int) Arrays.stream(words)
.filter(word -> !word.trim().isEmpty())
.count();
} catch (PatternSyntaxException e) {
getLogger().warn("Invalid regex pattern '{}', using default whitespace pattern", delimiterPattern);
Pattern defaultPattern = Pattern.compile("\\s+");
String[] words = defaultPattern.split(text.trim());
return (int) Arrays.stream(words)
.filter(word -> !word.trim().isEmpty())
.count();
}
}
}
Помимо основного кода для процессора, Apache NiFi предлагает решение в виде Mock Framework, а именно TestRunner Class. С помощью него, можно не запуская Apache NiFi (добавление нового NAR требует перезагрузки) тестировать различные кейсы и покрывать свой код тестами как в привычном бэкенде или другой области разработки. Пример кода для тестирования представлен ниже:
Пример тестов для процессора
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package ru.kotletkin.dev.processors.demohabrprocessor;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import java.util.HashMap;
import java.util.Map;
class MyProcessorTest {
private TestRunner testRunner;
@BeforeEach
void init() {
testRunner = TestRunners.newTestRunner(MyProcessor.class);
}
@Test
void testProcessorWithDefaultAttribute() {
// Устанавливаем свойства
testRunner.setProperty(MyProcessor.SOURCE_ATTRIBUTE, "text");
testRunner.setProperty(MyProcessor.OUTPUT_ATTRIBUTE, "word.count");
// СОЗДАЕМ АТРИБУТЫ
Map<String, String> attributes = new HashMap<>();
attributes.put("text", "Hello world this is Apache NiFi for Habr");
// Добавляем FlowFile с атрибутами
testRunner.enqueue("dummy content", attributes);
// Запускаем процессор
testRunner.run(1);
// Проверяем результаты
testRunner.assertTransferCount(MyProcessor.REL_SUCCESS, 1);
testRunner.assertTransferCount(MyProcessor.REL_FAILURE, 0);
// Проверяем атрибут 'word.count'
MockFlowFile result = testRunner.getFlowFilesForRelationship(MyProcessor.REL_SUCCESS).getFirst();
result.assertAttributeEquals("word.count", "8");
result.assertAttributeExists("filename"); // Стандартный атрибут
}
}
Далее, выполним mvn clean install в консоли.


Нам необходим собранный файл с расширением nar. Именно его мы и добавим в директорию lib.
После того, как вы добавили в директорию lib вновь собранный NAR, запустите Apache NiFi (или перезапустите его).
Далее, среди процессоров должен появиться наш новый процессор.

И в нем будут видны наши заранее реализованные параметры:

Создадим небольшой пайплайн для теста процессора:

Настроим процессор GenerateFlowFile, чтобы он генерировал файлы с уже имеющимся атрибутом:

Запустим все процессоры через Run Once и увидим результат в одной из очередей перед Funnel:

Проверим атрибуты:

Таким образом, видно, что процессор штатно отработал и мы прошли все этапы, от написания кода, до сборки и добавления нашего процессора в Apache NiFi.
Результат статьи в виде кодовой базы можно найти тут.
Итоги
Мы рассмотрели, как создать простейший процессор в Apache NiFi, реализовать логику, как использовать основные методы различных классов. Естественно, этой статьей разработка модулей не ограничивается ни в коем случае, и есть много различных направлений в этой области. Для полного погружения рекомендуется изучать документацию и тестировать на практике, желательно на большой нагрузке.
Комьюнити Apache NiFi в РФ: telegram