Привет, Хабр!

Потоки данных между системами стабильно увеличиваются, и в обозримом будущем эта тенденция вряд ли изменится, что создает постоянную потребность в инструментах для работы с данными.

Apache NiFi — программный продукт с открытым исходным кодом, написанный на языке Java, предназначенный для автоматизации потоков данных между системами. Главная его задача: организовывать ETL‑процессы. На GitHub у Apache NiFi имеется 5.9 тысяч звезд.

Для тех, кто не знает, что такое Apache NiFi советую прочитать отличную статью.

Моя статья посвящена написанию кастомного процессора для Apache NiFi на Java и требует базовых знаний в области Apache NiFi, опыт программирования на Java и IDE на борту компьютера.

Ну, начнем!

Для тех, кто немного подзабыл, что такое процессор в Apache NiFi

Процессор — это механизм, с помощью которого NiFi предоставляет доступ к FlowFiles, их атрибутам и содержимому. В Apache NiFi процессоры являются основными компоновочными блоками и используются для построения пайплайнов обработки.

В качестве примера я покажу, как создать процессор, выполняющий базовую задачу Word Count на основе существующего атрибута. Разумеется, в реальной практике такая задача вам вряд ли встретится, но в качестве обучающего примера этого более чем достаточно.

Мы будем разрабатывать процессор под Apache NiFi 2.4.0. Это практически самая свежая и стабильная версия на данный момент.

Первым делом стоит создать проект на основе Maven Archetype. В Intellij Idea для этого нужно выбрать New Project в верхней панели инструментов. Откроется окно, в котором в области Generators следует выбрать Maven Archetype.

В нашем случае в поле Name мы записываем название нашего проекта. Оно влияет лишь на имя директории, созданной в IDE.

В качестве JDK выбран Amazon Corretto 21, так как Apache NiFi рекомендует использовать Java 21 для работы с этой версией. В поле Catalog выбираем Maven Central, чтобы увидеть архетип для NiFi. После этого выбираем: org.apache.nifi:nifi-processor-bundle-archetype.

Версию архетипа выбираем в соответствии с версией Apache NiFi — в нашем случае это 2.4.0.

Также в additionalProperties следует обязательно указать artifactBaseName. Иначе там по умолчанию установится значение true, что сломает структуру пакетов. В artifactId мы устанавливаем идентичное значение. При желании можно также поправить package, если вы понимаете, как работает этот параметр. В поле Version изначально указано значение 1.0-SNAPSHOT. Однако при работе со SNAPSHOT‑версией у вас просто не пройдет сборка, потому что это не релизная версия вашего процессора.

Не собирающийся модуль
Можно игнорировать enforce, но это будет не совсем правильно
Можно игнорировать enforce, но это будет не совсем правильно

После этого жмем Create и получаем готовую структуру проекта. Выглядит она следующим образом:

Таким образом, мы получаем два модуля: один для сборки NAR и JAR. Для добавления модуля в NiFi будет использоваться как раз таки NAR.

Откроем основной файл с главный нашим классом, а именно MyProcessor (при желании его можно переименовать, это название влияет на нейминг в UI)

Код шаблонного класса
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package ru.kotletkin.dev.processors.demohabrprocessor;

import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.util.StandardValidators;

import java.util.List;
import java.util.Set;

@Tags({"example"})
@CapabilityDescription("Provide a description")
@SeeAlso({})
@ReadsAttributes({@ReadsAttribute(attribute="", description="")})
@WritesAttributes({@WritesAttribute(attribute="", description="")})
public class MyProcessor extends AbstractProcessor {

    public static final PropertyDescriptor MY_PROPERTY = new PropertyDescriptor
            .Builder()
            .name("My Property")
            .displayName("My Property")
            .description("Example Property")
            .required(true)
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .build();

    public static final Relationship REL_SUCCESS = new Relationship.Builder()
            .name("success")
            .description("Example success relationship")
            .build();

    private List<PropertyDescriptor> descriptors;

    private Set<Relationship> relationships;

    @Override
    protected void init(final ProcessorInitializationContext context) {
        descriptors = List.of(MY_PROPERTY);

        relationships = Set.of(REL_SUCCESS);
    }

    @Override
    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    @Override
    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return descriptors;
    }

    @OnScheduled
    public void onScheduled(final ProcessContext context) {

    }

    @Override
    public void onTrigger(final ProcessContext context, final ProcessSession session) {
        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }
        // TODO implement

        session.transfer(flowFile, REL_SUCCESS);
    }
}

Видно, что у класса присутствует много аннотаций и методов, которые переопределяют родительский AbstractProcessor. AbstractProcessor реализует интерфейс Processor и предоставляет готовые реализации большинства методов, а также позволяет сосредоточиться на бизнес‑логике, а не на написании boilerplate‑кода.

Про все существующие аннотации и методы в рамках одной статьи очень тяжело уложиться, поэтому про них лучше почитать в Developer Guide. Описание тех аннотаций, которые уже существуют в нашем коде, мы поговорим. Их описание в таблице ниже:

Название аннотации

Описание аннотации

@Tags

Используется для добавление тэгов для поиска и категоризации процессора в UI

@CapabilityDescription

Используется для описания процессора и производимого действия им. Помогает понять назначение без чтения документации

@ReadsAttributes

Сообщает системе и пользователям, какие атрибуты FlowFile читает процессор

@WritesAttributes

Сообщает системе и пользователям какие новые атрибуты FlowFile добавляет процессор

Для нашего тестового процессора, определим следующие параметры для этих аннотаций:

@Tags({"word", "count", "statistics", "habr"})
@CapabilityDescription("Counts the number of words in a specified FlowFile attribute")
@ReadsAttributes({
        @ReadsAttribute(attribute = "habr.wc.target", description = "The attribute that will be analyzed for word count")
})
@WritesAttributes({
        @WritesAttribute(attribute = "habr.wc.result", description = "The number of words found in the specified attribute")
})

Мы добавили тэги, описание, указали, что значение целевого атрибута по‑умолчанию для подсчета слов будет habr.wc.target, а результат запишется в атрибут habr.wc.result

Пример отображения значений аннотаций
Пример отображения заполненных аннотаций: тэги, описание
Пример отображения заполненных аннотаций: тэги, описание

Далее, рассмотрим настройки процессора. Примером в нашем коде в данном случае является параметр MY_PROPERTY с типом PropertyDescriptor. PropertyDescriptor определяет свойство, которое будет использоваться в Processor. Объект такого типа включает его имя, описание свойства, необязательное значение по умолчанию, логику проверки (валидатор) и указание на то, является ли свойство обязательным для корректной работы Processor. PropertyDescriptors создаются путем создания экземпляра класса PropertyDescriptor.Builder, вызова соответствующих методов для заполнения сведений о свойстве и, наконец, вызова метода build, то есть реализуют классический паттерн «Строитель».

Часть кода из шаблона
    public static final PropertyDescriptor MY_PROPERTY = new PropertyDescriptor
            .Builder()
            .name("My Property")
            .displayName("My Property")
            .description("Example Property")
            .required(true)
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .build();

Мы добавим в наш пример три атрибута: исходный атрибут (для перезаписи нашего дефолтного), результирующий, а также разделитель для слов. Примеры описания структуры практически идентичны в базовом шаблоне. Относительно валидаторов - они бывают разные и перечислены в классе StandardValidators.

   public static final PropertyDescriptor SOURCE_ATTRIBUTE = new PropertyDescriptor.Builder()
            .name("Source Attribute")
            .displayName("Source Attribute")
            .description("Name of the attribute that contains text for word counting")
            .required(true)
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .defaultValue("habr.wc.target")
            .build();

    public static final PropertyDescriptor OUTPUT_ATTRIBUTE = new PropertyDescriptor.Builder()
            .name("Output Attribute")
            .displayName("Output Attribute")
            .description("Name of the attribute to store the word count result")
            .required(true)
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .defaultValue("habr.wc.result")
            .build();

    public static final PropertyDescriptor WORD_DELIMITER = new PropertyDescriptor.Builder()
            .name("Word Delimiter")
            .displayName("Word Delimiter")
            .description("Regular expression pattern used to split text into words (default: whitespace characters)")
            .required(false)
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .defaultValue("\\s+")
            .build();

Геттеры getRelationships и getSupportedPropertyDescriptors в нашем примере, как и в большинстве случаев изменять вам не придется.

Однако, в конструкторе стоит поменять список descriptors и relationships, если они отличаются от базовых. В нашем случае это выглядит так:

  @Override
  protected void init(final ProcessorInitializationContext context) {
     descriptors = List.of(SOURCE_ATTRIBUTE, WORD_DELIMITER, OUTPUT_ATTRIBUTE);
     relationships = Set.of(REL_SUCCESS, REL_FAILURE);
  }

ProcessorInitializationContext в нашем случае не нужен, он предназначен для задач, когда в моменте инициализации процессора требуется выполнить какую‑либо задачу (логирование, досту�� к ControllerServices). Например, вывести идентификатор процессора при инициализации можно следующим образом:

@Override
protected void init(final ProcessorInitializationContext context) {
    String processorId = context.getIdentifier();
    getLogger().info("Processor ID: {}", processorId);
    // Выведет что-то вроде: "Processor ID: a1b2c3d4-1234-5678-90ab-cdef12345678"
}

Метод помеченный аннотацией@OnScheduled используется для реализации логики, которая будет выполняться при запуске процессора.

Теперь перейдем к самому главному, реализацию метода onTrigger, который отвечает за работу, выполняемую с каждым FlowFile.

Сразу же, нас встречает небольшая часть кода, который позволяет прекратить выполнение, если FlowFile не существует. Выглядит это как простейшая защита от NPE:

FlowFile flowFile = session.get();
if (flowFile == null) {
    return;
}

Далее, нам уже необходимо реализовать наш код по подсчету слов.

Объяснять алгоритм по подсчету слов выглядит как не самая интересная часть статьи, поэтому подсчет будет вынесен в отдельный метод кода. Разберем код метода onTrigger:

@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {

    FlowFile flowFile = session.get();
    if (flowFile == null) {
        return;
    }

    try {

        // Получаем значения свойств
        final String sourceAttribute = context.getProperty(SOURCE_ATTRIBUTE).getValue();
        final String wordDelimiter = context.getProperty(WORD_DELIMITER).getValue();
        final String outputAttribute = context.getProperty(OUTPUT_ATTRIBUTE).getValue();

        // Получаем значение атрибута, в котором нужно совершить Word Count
        final String attributeValue = flowFile.getAttribute(sourceAttribute);

        // Простейшие проверки
        if (attributeValue == null || attributeValue.trim().isEmpty()) {
            getLogger().warn("Attribute '{}' does not exist or is empty for FlowFile {}",
                    sourceAttribute, flowFile.getId());
            // Отправки файла в нужный RelationShip
            session.transfer(flowFile, REL_FAILURE);
            return;
        }

        // Подсчитываем слова
        final int wordCount = countWords(attributeValue, wordDelimiter);

        getLogger().debug("Found {} words in attribute '{}'", wordCount, sourceAttribute);

        // Добавляем новый атрибут
        final Map<String, String> attributes = new HashMap<>();
        attributes.put(outputAttribute, String.valueOf(wordCount));

        // Кладем все атрибуты
        flowFile = session.putAllAttributes(flowFile, attributes);

        // Перенаправляем в Success
        session.transfer(flowFile, REL_SUCCESS);
        session.getProvenanceReporter().modifyAttributes(flowFile);

        getLogger().info("Successfully counted {} words for FlowFile {}", wordCount, flowFile.getId());

    } catch (final Exception e) {
        getLogger().error("Error processing FlowFile {}: {}", flowFile.getId(), e.getMessage(), e);
        session.transfer(flowFile, REL_FAILURE);
    }
}

private int countWords(final String text, final String delimiterPattern) {

    if (text == null || text.trim().isEmpty()) {
        return 0;
    }

    try {
        Pattern pattern;
        if (delimiterPattern != null && !delimiterPattern.trim().isEmpty()) {
            pattern = Pattern.compile(delimiterPattern);
        } else {
            pattern = Pattern.compile("\\s+");
        }

        String[] words = pattern.split(text.trim());
        return (int) Arrays.stream(words)
                .filter(word -> !word.trim().isEmpty())
                .count();

    } catch (PatternSyntaxException e) {
        getLogger().warn("Invalid regex pattern '{}', using default whitespace pattern", delimiterPattern);
        Pattern defaultPattern = Pattern.compile("\\s+");
        String[] words = defaultPattern.split(text.trim());
        return (int) Arrays.stream(words)
                .filter(word -> !word.trim().isEmpty())
                .count();
    }
}

В самом начале метод принимает два параметра с типами ProcessContext и ProcessSession.

ProcessContext предоставляет доступ к настройкам и конфигурации процессора. Позволяет получать значения свойств, идентификатор процессора или любую другую информацию о нем.

ProcessSession предоставляет методы для манипуляции FlowFiles и управления транзакциями. Позволяет получать FlowFile из входной очереди, создавать новые, читать содержимые, добавлять атрибуты и так далее

Так, в следующем участке кода мы получаем значения параметров, которые определяли ранее:

final String sourceAttribute = context.getProperty(SOURCE_ATTRIBUTE).getValue();
final String wordDelimiter = context.getProperty(WORD_DELIMITER).getValue();
final String outputAttribute = context.getProperty(OUTPUT_ATTRIBUTE).getValue();

В UI значения параметров выглядят так:

Атрибуты которые мы задали, отображаются в UI
Атрибуты которые мы задали, отображаются в UI

Затем, получаем текст, который содержится в нашем атрибуте, название которого содержится в параметре Source Attribute

final String attributeValue = flowFile.getAttribute(sourceAttribute);

Далее, проводим проверки в коде и в случае, если какая‑либо из них не проходит (обычное условие if), отправляем файл в Failure Relation. За это отвечает session.transfer()

// Простейшие проверки
if (attributeValue == null || attributeValue.trim().isEmpty()) {
    getLogger().warn("Attribute '{}' does not exist or is empty for FlowFile {}",
            sourceAttribute, flowFile.getId());
    // Отправки файла в нужный RelationShip
    session.transfer(flowFile, REL_FAILURE);
    return;
}

В случае, если ошибок, то продолжаем выполнять логику и отправляем FlowFile в REL_SUCCESS:

// Подсчитываем слова
final int wordCount = countWords(attributeValue, wordDelimiter);

getLogger().debug("Found {} words in attribute '{}'", wordCount, sourceAttribute);

// Добавляем новый атрибут
final Map<String, String> attributes = new HashMap<>();
attributes.put(outputAttribute, String.valueOf(wordCount));

// Кладем все атрибуты
flowFile = session.putAllAttributes(flowFile, attributes);

// Перенаправляем в Success
session.transfer(flowFile, REL_SUCCESS);
// Зафиксировать в журнале аудита, что атрибуты этого FlowFile были изменены
session.getProvenanceReporter().modifyAttributes(flowFile);

getLogger().info("Successfully counted {} words for FlowFile {}", wordCount, flowFile.getId());

Таким образом, у нас получается полноценный код для нашего процессора. Подведем итог готового кода.

Результат кода для процессора
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package ru.kotletkin.dev.processors.demohabrprocessor;

import org.apache.nifi.annotation.behavior.ReadsAttribute;
import org.apache.nifi.annotation.behavior.ReadsAttributes;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.*;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;

import java.util.*;
import java.util.regex.Pattern;
import java.util.regex.PatternSyntaxException;

@Tags({"word", "count", "statistics", "habr"})
@CapabilityDescription("Counts the number of words in a specified FlowFile attribute")
@ReadsAttributes({
        @ReadsAttribute(attribute = "habr.wc.target", description = "The attribute that will be analyzed for word count")
})
@WritesAttributes({
        @WritesAttribute(attribute = "habr.wc.result", description = "The number of words found in the specified attribute")
})
public class MyProcessor extends AbstractProcessor {

    public static final PropertyDescriptor SOURCE_ATTRIBUTE = new PropertyDescriptor.Builder()
            .name("Source Attribute")
            .displayName("Source Attribute")
            .description("Name of the attribute that contains text for word counting")
            .required(true)
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .defaultValue("habr.wc.target")
            .build();

    public static final PropertyDescriptor OUTPUT_ATTRIBUTE = new PropertyDescriptor.Builder()
            .name("Output Attribute")
            .displayName("Output Attribute")
            .description("Name of the attribute to store the word count result")
            .required(true)
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .defaultValue("habr.wc.result")
            .build();


    public static final PropertyDescriptor WORD_DELIMITER = new PropertyDescriptor.Builder()
            .name("Word Delimiter")
            .displayName("Word Delimiter")
            .description("Regular expression pattern used to split text into words (default: whitespace characters)")
            .required(false)
            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
            .defaultValue("\\s+")
            .build();

    public static final Relationship REL_SUCCESS = new Relationship.Builder()
            .name("success")
            .description("FlowFiles with successfully word count")
            .build();

    public static final Relationship REL_FAILURE = new Relationship.Builder()
            .name("failure")
            .description("FlowFiles with not successfully word count")
            .build();

    private List<PropertyDescriptor> descriptors;

    private Set<Relationship> relationships;

    @Override
    protected void init(final ProcessorInitializationContext context) {
        descriptors = List.of(SOURCE_ATTRIBUTE, WORD_DELIMITER, OUTPUT_ATTRIBUTE);
        relationships = Set.of(REL_SUCCESS, REL_FAILURE);
    }

    @Override
    public Set<Relationship> getRelationships() {
        return this.relationships;
    }

    @Override
    public final List<PropertyDescriptor> getSupportedPropertyDescriptors() {
        return descriptors;
    }

    @OnScheduled
    public void onScheduled(final ProcessContext context) {

    }

    @Override
    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {

        FlowFile flowFile = session.get();
        if (flowFile == null) {
            return;
        }

        try {

            // Получаем значения свойств
            final String sourceAttribute = context.getProperty(SOURCE_ATTRIBUTE).getValue();
            final String wordDelimiter = context.getProperty(WORD_DELIMITER).getValue();
            final String outputAttribute = context.getProperty(OUTPUT_ATTRIBUTE).getValue();

            // Получаем значение атрибута, в котором нужно совершить Word Count
            final String attributeValue = flowFile.getAttribute(sourceAttribute);

            // Простейшие проверки
            if (attributeValue == null || attributeValue.trim().isEmpty()) {
                getLogger().warn("Attribute '{}' does not exist or is empty for FlowFile {}",
                        sourceAttribute, flowFile.getId());
                // Отправки файла в нужный RelationShip
                session.transfer(flowFile, REL_FAILURE);
                return;
            }

            // Подсчитываем слова
            final int wordCount = countWords(attributeValue, wordDelimiter);

            getLogger().debug("Found {} words in attribute '{}'", wordCount, sourceAttribute);

            // Добавляем новый атрибут
            final Map<String, String> attributes = new HashMap<>();
            attributes.put(outputAttribute, String.valueOf(wordCount));

            // Кладем все атрибуты
            flowFile = session.putAllAttributes(flowFile, attributes);

            // Перенаправляем в Success
            session.transfer(flowFile, REL_SUCCESS);
            // Зафиксировать в журнале аудита, что атрибуты этого FlowFile были изменены
            session.getProvenanceReporter().modifyAttributes(flowFile);

            getLogger().info("Successfully counted {} words for FlowFile {}", wordCount, flowFile.getId());

        } catch (final Exception e) {
            getLogger().error("Error processing FlowFile {}: {}", flowFile.getId(), e.getMessage(), e);
            session.transfer(flowFile, REL_FAILURE);
        }
    }

    private int countWords(final String text, final String delimiterPattern) {

        if (text == null || text.trim().isEmpty()) {
            return 0;
        }

        try {
            Pattern pattern;
            if (delimiterPattern != null && !delimiterPattern.trim().isEmpty()) {
                pattern = Pattern.compile(delimiterPattern);
            } else {
                pattern = Pattern.compile("\\s+");
            }

            String[] words = pattern.split(text.trim());
            return (int) Arrays.stream(words)
                    .filter(word -> !word.trim().isEmpty())
                    .count();

        } catch (PatternSyntaxException e) {
            getLogger().warn("Invalid regex pattern '{}', using default whitespace pattern", delimiterPattern);
            Pattern defaultPattern = Pattern.compile("\\s+");
            String[] words = defaultPattern.split(text.trim());
            return (int) Arrays.stream(words)
                    .filter(word -> !word.trim().isEmpty())
                    .count();
        }
    }
}

Помимо основного кода для процессора, Apache NiFi предлагает решение в виде Mock Framework, а именно TestRunner Class. С помощью него, можно не запуская Apache NiFi (добавление нового NAR требует перезагрузки) тестировать различные кейсы и покрывать свой код тестами как в привычном бэкенде или другой области разработки. Пример кода для тестирования представлен ниже:

Пример тестов для процессора
/*
 * Licensed to the Apache Software Foundation (ASF) under one or more
 * contributor license agreements.  See the NOTICE file distributed with
 * this work for additional information regarding copyright ownership.
 * The ASF licenses this file to You under the Apache License, Version 2.0
 * (the "License"); you may not use this file except in compliance with
 * the License.  You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package ru.kotletkin.dev.processors.demohabrprocessor;

import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.HashMap;
import java.util.Map;

class MyProcessorTest {

    private TestRunner testRunner;

    @BeforeEach
    void init() {
        testRunner = TestRunners.newTestRunner(MyProcessor.class);
    }

    @Test
    void testProcessorWithDefaultAttribute() {
        // Устанавливаем свойства
        testRunner.setProperty(MyProcessor.SOURCE_ATTRIBUTE, "text");
        testRunner.setProperty(MyProcessor.OUTPUT_ATTRIBUTE, "word.count");

        // СОЗДАЕМ АТРИБУТЫ
        Map<String, String> attributes = new HashMap<>();
        attributes.put("text", "Hello world this is Apache NiFi for Habr");

        // Добавляем FlowFile с атрибутами
        testRunner.enqueue("dummy content", attributes);

        // Запускаем процессор
        testRunner.run(1);

        // Проверяем результаты
        testRunner.assertTransferCount(MyProcessor.REL_SUCCESS, 1);
        testRunner.assertTransferCount(MyProcessor.REL_FAILURE, 0);

        // Проверяем атрибут 'word.count'
        MockFlowFile result = testRunner.getFlowFilesForRelationship(MyProcessor.REL_SUCCESS).getFirst();
        result.assertAttributeEquals("word.count", "8");
        result.assertAttributeExists("filename"); // Стандартный атрибут
    }
}

Далее, выполним mvn clean install в консоли.

Результат процесса сборки
Результат процесса сборки
Результат сборки
Результат сборки

Нам необходим собранный файл с расширением nar. Именно его мы и добавим в директорию lib.

После того, как вы добавили в директорию lib вновь собранный NAR, запустите Apache NiFi (или перезапустите его).

Далее, среди процессоров должен появиться наш новый процессор.

Самостоятельно реализованный процессор
Самостоятельно реализованный процессор

И в нем будут видны наши заранее реализованные параметры:

Создадим небольшой пайплайн для теста процессора:

Простейший пайплайн для тестирования
Простейший пайплайн для тестирования

Настроим процессор GenerateFlowFile, чтобы он генерировал файлы с уже имеющимся атрибутом:

Добавление атрибута для Apache NiFi
Добавление атрибута для Apache NiFi

Запустим все процессоры через Run Once и увидим результат в одной из очередей перед Funnel:

Очередь с тестовым FlowFile
Очередь с тестовым FlowFile

Проверим атрибуты:

Наши атрибуты с подсчитанным количеством слов
Наши атрибуты с подсчитанным количеством слов

Таким образом, видно, что процессор штатно отработал и мы прошли все этапы, от написания кода, до сборки и добавления нашего процессора в Apache NiFi.

Результат статьи в виде кодовой базы можно найти тут.

Итоги

Мы рассмотрели, как создать простейший процессор в Apache NiFi, реализовать логику, как использовать основные методы различных классов. Естественно, этой статьей разработка модулей не ограничивается ни в коем случае, и есть много различных направлений в этой области. Для полного погружения рекомендуется изучать документацию и тестировать на практике, желательно на большой нагрузке.

Комьюнити Apache NiFi в РФ: telegram

Комментарии (0)