Ремарка

Обратите внимание, что методы и подходы, описанные в этой статье, являются одним из способов решения возникших проблем и могут не являться наилучшим или рекомендуемым вариантом для всех ситуаций. Автор не призывает следовать описанным шагам без учета возможных альтернатив и не гарантирует, что данный метод подойдет для всех пользователей или случаев. Учитывайте, что использование нестандартных решений может повлечь за собой риски или потребовать дополнительных усилий для поддержки и обновлений. Рекомендуется всегда исследовать и оценивать возможные варианты решений в контексте вашего проекта и требований.

Как запустить Apache Atlas в Docker

Первое, с чем я столкнулась, — это то, что большинство (возможно, не все, я, конечно, не проверяла) образов Apache Atlas не работают из коробки.

Каждый из них падал с следующей ошибкой:

14: curl#6 - "Could not resolve host: mirrorlist.centos.org; Unknown error"


 One of the configured repositories failed (Unknown),
 and yum doesn't have enough cached data to continue. At this point the only
 safe thing yum can do is fail. There are a few ways to work "fix" this:

     1. Contact the upstream for the repository and get them to fix the problem.

     2. Reconfigure the baseurl/etc. for the repository, to point to a working
        upstream. This is most often useful if you are using a newer
        distribution release than is supported by the repository (and the
        packages for the previous distribution release still work).

     3. Run the command with the repository temporarily disabled
            yum --disablerepo=<repoid> ...

     4. Disable the repository permanently, so yum won't use it by default. Yum
        will then just ignore the repository until you permanently enable it
        again or use --enablerepo for temporary usage:

            yum-config-manager --disable <repoid>
        or
            subscription-manager repos --disable=<repoid>

     5. Configure the failing repository to be skipped, if it is unavailable.
        Note that yum will try to contact the repo. when it runs most commands,
        so will have to try and fail each time (and thus. yum will be be much
        slower). If it is a very temporary problem though, this is often a nice
        compromise:

            yum-config-manager --save --setopt=<repoid>.skip_if_unavailable=true

Об этой ошибке чуть позже.

Я начала искать готовые Docker-файлы на GitHub.

Однако они также сталкивались с той же ошибкой.


Ошибка связана с тем, что Docker не смог найти или разрешить хост для репозитория mirrorlist.centos.org. Это привело к сбою команды yum, используемой для установки пакетов внутри контейнера Docker. Подобные ошибки могут возникать, если репозиторий больше не поддерживается или временно недоступен.

Остается только написать свои Docker-файлы. Я взяла за основу файлы из данного репозитория (уточню, что я использовала не последнюю версию, а на один коммит назад от версии на 27.08.2024, так как последнюю версию не удалось запустить в принципе), поскольку на него ссылались в данной статье.

Первое, что мы делаем — убираем все лишнее из docker-compose.yml.

В оригинале, помимо Atlas и сервисов, необходимых для его работы, также поднимались Spark, Hive, Hadoop (DataNode, NameNode). Все это оказалось ненужным, поэтому мы просто вычеркиваем это из файла, а также удаляем папки Spark и Hive. Оставляем только необходимое: сам Apache Atlas, Apache Kafka и Apache Zookeeper.

Итого получаем:

version: "2"

services:

  atlas-server:
    build: ./atlas
    image: pico/apache-atlas
    volumes:
      - ./atlas/resources/1000-Hadoop:/opt/atlas/models/1000-Hadoop
    ports:
      - "21000:21000"
    depends_on:
      - "zookeeper"
      - "kafka" 

  zookeeper:
    image: wurstmeister/zookeeper
    hostname: zookeeper
    ports:
      - "2181:2181"  

  kafka:
    container_name: kafka
    image: wurstmeister/kafka
    ports:
      - "9092:9092"
    hostname: kafka
    environment:
      KAFKA_CREATE_TOPICS: "create_events:1:1,delete_events:1:1,ATLAS_HOOK:1:1"
      KAFKA_ADVERTISED_HOST_NAME: kafka
      KAFKA_ADVERTISED_PORT: 9092
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
    depends_on:
      - zookeeper  

Далее мы переходим к Dockerfile, который собирает Apache Atlas в контейнере. Для устранения ошибки достаточно прописать архивное зеркало для CentOS сразу после строки FROM centos:7.

FROM centos:7

COPY --from=stage-atlas /apache-atlas.tar.gz /apache-atlas.tar.gz

#новые строчки с архивным зеркалом centos
RUN sed -i s/mirror.centos.org/vault.centos.org/g /etc/yum.repos.d/*.repo \
	&& sed -i s/^#.*baseurl=http/baseurl=http/g /etc/yum.repos.d/*.repo \
	&& sed -i s/^mirrorlist=http/#mirrorlist=http/g /etc/yum.repos.d/*.repo

И можно запускать docker-compose up -d. Однако возникает следующая ошибка:

Command "/usr/bin/python3 -u -c "import setuptools, tokenize;__file__='/tmp/pip-build-f8uu6b5l/typed-ast/setup.py';f=getattr(tokenize, 'open', open)(__file__);code=f.read().replace('\r\n', '\n');f.close();exec(compile(code, __file__, 'exec'))" install --record /tmp/pip-artr5er7-record/install-record.txt --single-version-externally-managed --compile" failed with error code 1 in /tmp/pip-build-f8uu6b5l/typed-ast/ ERROR: Service 'atlas-server' failed to build: The command '/bin/sh -c pip3 install amundsenatlastypes==1.1.0' returned a non-zero code: 1

Ошибка связана с неудачной установкой Python-пакета amundsenatlastypes. Решение еще проще: в Dockerfile в строке RUN pip3 install amundsenatlastypes==1.1.0 измените версию на 1.2.2, и снова запустите docker-compose up -d. На этот раз все будет работать. Можно перейти по адресу http://localhost:21000/ и открыть веб-интерфейс Apache Atlas. Отмечу, что запуск может занять несколько минут; в первый раз у меня это заняло около 10 минут, но последующие запуски происходят гораздо быстрее.

Apache Atlas Web UI
Apache Atlas Web UI

Apache Atlas заработал, готовые файлы для запуска можно взять тут

поднимает Apache NI-Fi

Здесь проблем нет. Создайте файл docker-compose.yml со следующим содержанием и запустите его командой docker-compose up -d:

version: '3'

services:

  nifi:
    image: apache/nifi:1.20.0
    container_name: nifi
    ports:
      - "8080:8080"
      - "8443:8443"
      - "8081:8081"
    volumes:
      - nifi-data:/opt/nifi/nifi-current/logs
    environment:
      - NIFI_WEB_HTTP_PORT=8080
      - NIFI_WEB_HTTPS_PORT=8443
      - NIFI_WEB_HTTP_HOST=0.0.0.0
      - NIFI_WEB_PROXY_HOST=localhost
    restart: unless-stopped

volumes:
  nifi-data:
    driver: local

После этого достаточно перейти по адресу http://localhost:8080, и должен открыться веб-интерфейс NiFi. Отмечу, что для запуска может понадобиться несколько минут.

Apache NI-Fi Web UI
Apache NI-Fi Web UI

Постановка задачи

Задача состоит в том, чтобы создать репортер между NiFi и Atlas. Для этой задачи существует стандартный репортер под названием ReportLineageToAtlas (в Docker-образе по умолчанию данного плагина нет; в конце будет небольшой абзац о том, как добавлять плагины). Создадим в NiFi небольшой DAG и настроим репортер в Atlas.

даг
даг
настроеный репортер
настроеный репортер
результат в apache Atlas
результат в apache Atlas

Как мы видим, информация о DAG отображается в Atlas, но проблема в том, что отображается вся информация. Мне нужно скрыть определенные стадии DAG, чтобы их не было в Atlas. Таким образом, моя задача — создать репортер, который будет отделять, что нужно отображать, а что нет в Atlas. Мы введем правило, что отображаться будут только те стадии, в названиях которых в конце присутствует _to_atlas.

Создание плагина

Создадим файл pom.xml со следующим содержимым:

    <groupId>Ni-Fi</groupId>
    <artifactId>pack</artifactId>
    <!-- Тип пакета для данного артефакта (в данном случае "pom" указывает на то, что это основной POM для сборки) -->
    <packaging>pom</packaging>
    <version>1.20.0</version>

    <!-- Свойства, используемые в этом POM-файле -->
    <properties>
        <!-- Версия Apache NiFi, которая будет использоваться в модулях -->
        <nifi.version>1.20.0</nifi.version>

        <!-- Версия InfluxDB, которая будет использоваться в модулях -->
        <influxdb.version>2.9</influxdb.version>
    </properties>


    <!-- Определение модулей, которые являются частью этого Maven-проекта -->
    <modules>
        <!-- Модуль, отвечающий за реализацию задачи отчетности в Apache Atlas непосредственно с кодом  -->
        <module>reporting-task-atlas</module>

        <!-- NAR (NiFi Archive) модуль для задачи отчетности в Apache Atlas, нужен только чтоб паковать в .nsr архив проет-->
        <module>reporting-task-atlas-nar</module>
    </modules>

Определим зависемости

    <!-- Раздел для управления зависимостями Maven. 
         В этом разделе определяются версии зависимостей, используемых в проекте. 
         Эти зависимости могут быть включены в модули без явного указания их версий.
    -->
    <dependencyManagement>
        <dependencies>
            <!-- Зависимость для использования SLF4J API (Simple Logging Facade for Java) в проекте.
                 SLF4J предоставляет абстракцию для различных логгеров, таких как Log4j и Logback.
            -->
            <dependency>
                <groupId>org.slf4j</groupId>
                <artifactId>slf4j-api</artifactId>
                <version>1.7.12</version>
            </dependency>

            <!-- Зависимость для использования API Apache NiFi.
                 Эта зависимость предоставляет доступ к API Apache NiFi, необходимому для разработки NiFi компонентов.
                 Поскольку зависимость имеет scope 'provided', она требуется только во время компиляции и не будет включена в конечный артефакт.
            -->
            <dependency>
                <groupId>org.apache.nifi</groupId>
                <artifactId>nifi-api</artifactId>
                <version>${nifi.version}</version>
                <scope>provided</scope>
            </dependency>

            <!-- Зависимость для использования утилит процессоров Apache NiFi.
                 Эти утилиты могут использоваться для создания пользовательских процессоров в NiFi.
            -->
            <dependency>
                <groupId>org.apache.nifi</groupId>
                <artifactId>nifi-processor-utils</artifactId>
                <version>1.15.3</version>
            </dependency>

            <!-- Зависимость для использования утилит отчетности Apache NiFi.
                 Эти утилиты предоставляют инструменты для разработки компонентов отчетности в NiFi.
            -->
            <dependency>
                <groupId>org.apache.nifi</groupId>
                <artifactId>nifi-reporting-utils</artifactId>
                <version>${nifi.version}</version>
            </dependency>

            <!-- Зависимость для использования Apache Commons IO.
                 Commons IO предоставляет утилиты для ввода-вывода (I/O), такие как операции с файлами и потоками.
            -->
            <dependency>
                <groupId>org.apache.commons</groupId>
                <artifactId>commons-io</artifactId>
                <version>1.3.2</version>
            </dependency>
        </dependencies>
    </dependencyManagement>

Компоненты Apache NiFi следует упаковывать в .nar архивы, и для этого необходимо подключить несколько плагинов.

    <!-- Раздел сборки, где определяются плагины, используемые для процесса сборки проекта -->
    <build>
        <plugins>
            <!-- Плагин NiFi NAR Maven Plugin используется для создания NAR (NiFi Archive) файлов.
                 NAR файлы — это специальные архивы, используемые в Apache NiFi для упаковки расширений (например, процессоров, контроллеров и других компонентов).
                 Плагин необходим для построения и упаковки пользовательских компонентов NiFi в формате NAR.
            -->
            <plugin>
                <groupId>org.apache.nifi</groupId>
                <artifactId>nifi-nar-maven-plugin</artifactId>
                <version>1.5.0</version>
                <!-- Плагин используется как расширение Maven, добавляя функциональность для сборки NAR файлов. -->
                <extensions>true</extensions>
            </plugin>

            <!-- Плагин Maven Surefire Plugin используется для запуска юнит-тестов в фазе тестирования.
                 Этот плагин выполняет тесты, написанные с использованием фреймворков, таких как JUnit или TestNG, и генерирует отчет о тестах.
            -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-surefire-plugin</artifactId>
                <version>2.15</version>
            </plugin>

            <!-- Плагин Maven Compiler Plugin используется для компиляции исходного кода проекта.
                 В данном случае он настроен для использования версии Java 17 в качестве исходного и целевого уровня компиляции.
            -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <configuration>
                    <!-- Версия Java, используемая для компиляции исходного кода -->
                    <source>17</source>
                    <!-- Версия Java, на которую нацелен скомпилированный код -->
                    <target>17</target>
                </configuration>
            </plugin>
        </plugins>
    </build>

Теперь можно собирать .nar архивы. Переходим в модуль pico-reporting-task-atlas-nar и создаем там файл pom.xml со следующим содержимым:


<!-- Определение родительского POM для данного модуля.
     Родительский POM содержит общие настройки и зависимости, которые наследуются дочерними модулями.
     В данном случае родительский артефакт 'PicoReportLineageToAtlas' с группой 'pico.habr' и версией '1.20.0'.
-->
<parent>
    <groupId>pico.habr</groupId>
    <artifactId>PicoReportLineageToAtlas</artifactId>
    <version>1.20.0</version>
</parent>

<!-- Уникальный идентификатор артефакта для данного модуля в Maven.
     'artifactId' должен быть уникальным в рамках группы ('groupId') и представляет конкретный проект или модуль.
-->
<artifactId>pico-reporting-task-atlas-nar</artifactId>

<!-- Тип упаковки артефакта, который указывает на формат выходного файла.
     В данном случае используется 'nar' (NiFi Archive), специальный формат для расширений Apache NiFi.
-->
<packaging>nar</packaging>

<!-- Человекочитаемое имя модуля, используемое для идентификации проекта -->
<name>reporting-task-atlas-nar</name>

<!-- Определение свойств для проекта.
     Здесь задается кодировка исходных файлов проекта для обеспечения корректной обработки файлов с различными кодировками.
-->
<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<!-- Определение зависимостей для данного модуля -->
<dependencies>
    <!-- Зависимость от другого модуля внутри той же группы 'pico.habr'.
         Этот модуль предоставляет основные компоненты и классы для задачи отчетности в Atlas.
         Данная зависимость используется на этапе компиляции ('scope' compile).
    -->
    <dependency>
        <groupId>pico.habr</groupId>
        <artifactId>reporting-task-atlas</artifactId>
        <version>1.20.0</version>
        <scope>compile</scope>
    </dependency>
    
    <!-- Зависимость от стандартных API сервисов Apache NiFi.
         Эта зависимость предоставляет NAR-файл, который содержит стандартные API сервисов, используемых в NiFi.
         Указан тип зависимости 'nar', поскольку это специфичный для NiFi формат архива.
    -->
    <dependency>
        <groupId>org.apache.nifi</groupId>
        <artifactId>nifi-standard-services-api-nar</artifactId>
        <version>1.20.0</version>
        <type>nar</type>
    </dependency>
</dependencies>

В этом модуле больше ничего трогать не нужно, и при запуске сборки проекта в этой директории появится .nar архив.

Переходим в модуль reporting-task-atlas и создаем там файл pom.xml со следующим содержимым:

<!-- Определение родительского POM для данного модуля.
     Родительский POM содержит общие настройки и зависимости, которые наследуются дочерними модулями.
     В данном случае родительский артефакт 'PicoReportLineageToAtlas' с группой 'pico.habr' и версией '1.20.0'.
-->
<parent>
    <groupId>pico.habr</groupId>
    <artifactId>PicoReportLineageToAtlas</artifactId>
    <version>1.20.0</version>
</parent>

<!-- Уникальный идентификатор артефакта для данного модуля в Maven.
     'artifactId' должен быть уникальным в рамках группы ('groupId') и представляет конкретный проект или модуль.
-->
<artifactId>reporting-task-atlas</artifactId>

<!-- Указание версии модуля. Обычно совпадает с версией родительского POM. -->
<version>1.20.0</version>

<!-- Тип упаковки артефакта. 'jar' означает, что выходной файл будет в формате JAR (Java ARchive). -->
<packaging>jar</packaging>

<!-- Определение зависимостей, необходимых для компиляции и выполнения проекта. -->
<dependencies>

    <!-- Зависимость от утилит для процессоров Apache NiFi.
         Содержит вспомогательные классы и методы, необходимые для создания пользовательских процессоров в NiFi.
    -->
    <dependency>
        <groupId>org.apache.nifi</groupId>
        <artifactId>nifi-processor-utils</artifactId>
    </dependency>

    <!-- Зависимость от SLF4J API (Simple Logging Facade for Java).
         Предоставляет абстракцию для различных логгеров, таких как Log4j и Logback.
    -->
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-api</artifactId>
    </dependency>

    <!-- Зависимость от API Apache NiFi.
         Предоставляет основные интерфейсы и классы для разработки компонентов NiFi.
    -->
    <dependency>
        <groupId>org.apache.nifi</groupId>
        <artifactId>nifi-api</artifactId>
    </dependency>

    <!-- Зависимость от утилит отчетности Apache NiFi.
         Предоставляет классы и методы для разработки компонентов отчетности в NiFi.
    -->
    <dependency>
        <groupId>org.apache.nifi</groupId>
        <artifactId>nifi-reporting-utils</artifactId>
    </dependency>

    <!-- Зависимость от компонента задачи отчетности в Atlas для NiFi.
         Позволяет интегрировать Apache NiFi с Apache Atlas для отправки метаданных о lineage.
         Версия зависит от свойства 'nifi.version'.
    -->
    <dependency>
        <groupId>org.apache.nifi</groupId>
        <artifactId>nifi-atlas-reporting-task</artifactId>
        <version>${nifi.version}</version>
    </dependency>

    <!-- Зависимость от API сервиса SSL контекста NiFi.
         Предоставляет API для работы с SSL контекстами, используемыми в NiFi.
         Версия зависит от свойства 'nifi.version'.
    -->
    <dependency>
        <groupId>org.apache.nifi</groupId>
        <artifactId>nifi-ssl-context-service-api</artifactId>
        <version>${nifi.version}</version>
    </dependency>

    <!-- Зависимость от API сервиса Kerberos для NiFi.
         Предоставляет API для управления учетными данными Kerberos в NiFi.
         Используется на этапе компиляции ('scope' compile).
    -->
    <dependency>
        <groupId>org.apache.nifi</groupId>
        <artifactId>nifi-kerberos-credentials-service-api</artifactId>
        <version>${nifi.version}</version>
        <scope>compile</scope>
    </dependency>

    <!-- Зависимость от репозитория постоянного происхождения NiFi.
         Используется для сохранения данных lineage в формате, который может быть восстановлен при перезапуске NiFi.
    -->
    <dependency>
        <groupId>org.apache.nifi</groupId>
        <artifactId>nifi-persistent-provenance-repository</artifactId>
        <version>${nifi.version}</version>
    </dependency>

    <!-- Зависимость от библиотеки JetBrains Annotations.
         Предоставляет аннотации, используемые для улучшения статического анализа и других задач компиляции.
         Используется на этапе компиляции ('scope' compile).
    -->
    <dependency>
        <groupId>org.jetbrains</groupId>
        <artifactId>annotations</artifactId>
        <version>RELEASE</version>
        <scope>compile</scope>
    </dependency>

    <!-- Зависимость от библиотеки Lombok.
         Позволяет упростить написание Java кода, автоматически генерируя геттеры, сеттеры и другие методы во время компиляции.
         Используется только на этапе компиляции и не включается в финальный артефакт ('scope' provided).
    -->
    <dependency>
        <groupId>org.projectlombok</groupId>
        <artifactId>lombok</artifactId>
        <version>1.18.26</version>
        <scope>provided</scope>
    </dependency>
</dependencies>

Все pom.xml файлы готовы, и остается только написать код репортера.

Создаю класс PicoReportLineageToAtlas и копирую в него код класса org.apache.nifi.atlas.reporting.ReportLineageToAtlas.

Меняю в нем:

  • package org.apache.nifi.atlas.reporting на package pico.habr.nifi.atlas.reporting

  • Название самого репортера с ReportLineageToAtlas на PicoReportLineageToAtlas

  • Немного обновляю теги.

// было 
@Tags({"atlas", "lineage"})
@CapabilityDescription("Report NiFi flow data set level lineage to Apache Atlas." +
        " End-to-end lineages across NiFi environments and other systems can be reported if those are" +
        " connected by different protocols and data set, such as NiFi Site-to-Site, Kafka topic or Hive tables ... etc." +
        " Atlas lineage reported by this reporting task can be useful to grasp the high level relationships between processes and data sets," +
        " in addition to NiFi provenance events providing detailed event level lineage." +
        " See 'Additional Details' for further description and limitations.")
@Stateful(scopes = Scope.LOCAL, description = "Stores the Reporting Task's last event Id so that on restart the task knows where it left off.")
@DynamicProperty(name = "hostnamePattern.<namespace>", value = "hostname Regex patterns",
                 description = RegexNamespaceResolver.PATTERN_PROPERTY_PREFIX_DESC, expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY)
// In order for each reporting task instance to have its own static objects such as KafkaNotification.
@RequiresInstanceClassLoading
public class ReportLineageToAtlas extends AbstractReportingTask {

  //...

}

// стало
@Tags({"pico", "atlas", "lineage"})
@CapabilityDescription("Report NiFi flow data set level lineage to Apache Atlas." +
        " End-to-end lineages across NiFi environments and other systems can be reported if those are" +
        " connected by different protocols and data set, such as NiFi Site-to-Site, Kafka topic or Hive tables ... etc." +
        " Atlas lineage reported by this reporting task can be useful to grasp the high level relationships between processes and data sets," +
        " in addition to NiFi provenance events providing detailed event level lineage." +
        " See 'Additional Details' for further description and limitations.")
@Stateful(scopes = Scope.LOCAL, description = "Stores the Reporting Task's last event Id so that on restart the task knows where it left off.")
@DynamicProperty(name = "hostnamePattern.<namespace>", value = "hostname Regex patterns",
                 description = RegexNamespaceResolver.PATTERN_PROPERTY_PREFIX_DESC, expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY)
// In order for each reporting task instance to have its own static objects such as KafkaNotification.
@RequiresInstanceClassLoading
public class PicoReportLineageToAtlas extends AbstractReportingTask {

  //...

}

Далее задача — найти, где в коде ставится фильтрация по имени компонентов дага.

Как оказалось, за это отвечает другой класс, а именно org.apache.nifi.atlas.NiFiFlowAnalyzer. Поэтому создаем еще один класс PicoNiFiFlowAnalyzer и аналогично копируем в него код. После этого вносим несколько правок.

//было
package org.apache.nifi.atlas;

import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.util.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

public class NiFiFlowAnalyzer { 
  //...
  }

//меняем на 
package pico.habr.nifi.atlas; //замена групы

import org.apache.atlas.model.instance.AtlasEntity;
import org.apache.atlas.model.instance.AtlasObjectId;
import org.apache.nifi.atlas.NiFiFlow; //добовляю недостающию зависемость
import org.apache.nifi.atlas.NiFiFlowPath; //добовляю недостающию зависемость
import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.util.Tuple;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;

public class NiFiFlowAnalyzer { //меняю название класса
  //...
  }

Находим в коде метод analyzeProcessGroup и вносим правки.

// было
private void analyzeProcessGroup(final ProcessGroupStatus processGroupStatus, final NiFiFlow nifiFlow) {

        processGroupStatus.getConnectionStatus().forEach(c -> nifiFlow.addConnection(c));
        processGroupStatus.getProcessorStatus().forEach(p -> nifiFlow.addProcessor(p));
        processGroupStatus.getRemoteProcessGroupStatus().forEach(r -> nifiFlow.addRemoteProcessGroup(r));
        processGroupStatus.getInputPortStatus().forEach(p -> nifiFlow.addInputPort(p));
        processGroupStatus.getOutputPortStatus().forEach(p -> nifiFlow.addOutputPort(p));

        // Analyze child ProcessGroups recursively.
        for (ProcessGroupStatus child : processGroupStatus.getProcessGroupStatus()) {
            analyzeProcessGroup(child, nifiFlow);
        }

    }

// правки
private void analyzeProcessGroup(final ProcessGroupStatus processGroupStatus, final NiFiFlow nifiFlow) {

        processGroupStatus.getConnectionStatus().forEach(c -> nifiFlow.addConnection(c));
        processGroupStatus.getProcessorStatus()
                .stream().filter(c -> c.getName().toLowerCase().endsWith("_to_atlas")) // фильтрация по суффиксу
                .forEach(p -> nifiFlow.addProcessor(p));
        processGroupStatus.getRemoteProcessGroupStatus().forEach(r -> nifiFlow.addRemoteProcessGroup(r));
        processGroupStatus.getInputPortStatus()
                .stream().filter(c -> c.getName().toLowerCase().endsWith("_to_atlas")) // фильтрация по суффиксу
                .forEach(p -> nifiFlow.addInputPort(p));
        processGroupStatus.getOutputPortStatus()
                .stream().filter(c -> c.getName().toLowerCase().endsWith("_to_atlas")) // фильтрация по суффиксу
                .forEach(p -> nifiFlow.addOutputPort(p));

        // Analyze child ProcessGroups recursively.
        for (ProcessGroupStatus child : processGroupStatus.getProcessGroupStatus()) {
            analyzeProcessGroup(child, nifiFlow);
        }

    }

Остается только в PicoReportLineageToAtlas подменить один класс на другой.

import org.apache.nifi.atlas.NiFiFlowAnalyzer
//меняем на 
import pico.habr.nifi.atlas.PicoNiFiFlowAnalyzer;

//и

final NiFiFlowAnalyzer flowAnalyzer = new NiFiFlowAnalyzer();
//меняем на 
final PicoNiFiFlowAnalyzer flowAnalyzer = new PicoNiFiFlowAnalyzer();

Остается только в resources создать файл org.apache.nifi.reporting.ReportingTask со следующим содержимым:

pico.habr.nifi.atlas.reporting.PicoReportLineageToAtlas

Он содержит в себе только путь до класса репортера. Этот файл используется для регистрации пользовательских реализаций интерфейса ReportingTask в NiFi. Он следует спецификации Java Service Provider Interface (SPI), которая позволяет динамически находить и загружать реализации интерфейсов или абстрактных классов во время выполнения.

Остается только собрать проект командой mvn clean install.

И тогда в модуле pico-reporting-task-atlas-nar появится .nar архив pico-reporting-task-atlas-nar-1.20.0.nar.

Итого, имеем следующую структуру проекта:

ls -R
.:
pom.xml  reporting-task-atlas  reporting-task-atlas-nar

./reporting-task-atlas:
pom.xml  src

./reporting-task-atlas/src:
main

./reporting-task-atlas/src/main:
java  resources

./reporting-task-atlas/src/main/java:
pico

./reporting-task-atlas/src/main/java/pico:
habr

./reporting-task-atlas/src/main/java/pico/habr:
nifi

./reporting-task-atlas/src/main/java/pico/habr/nifi:
atlas

./reporting-task-atlas/src/main/java/pico/habr/nifi/atlas:
PicoNiFiFlowAnalyzer.java  reporting

./reporting-task-atlas/src/main/java/pico/habr/nifi/atlas/reporting:
PicoReportLineageToAtlas.java

./reporting-task-atlas/src/main/resources:
META-INF

./reporting-task-atlas/src/main/resources/META-INF:
services

./reporting-task-atlas/src/main/resources/META-INF/services:
org.apache.nifi.reporting.ReportingTask

./reporting-task-atlas-nar:
pom.xml

как проникунуть .nar в NIFI

Чтобы подключить .nar архив в NIFI, достаточно прокинуть его в папку lib, которая должна находиться в корневой папке NIFI. В случае с Docker можно воспользоваться командой docker cp. После этого достаточно перезагрузить NIFI или контейнер целиком.

Результат

заходим в панель управления NIFI и находим там кастомный репортер. Настраиваем его в соответствии с требованиями. (настраиваеться так же как и оригинальный)

Для примера создам следующий даг

И смотрим что отобразилось в Atlas

И как видим в Атласе процессора D не появилось, поскольку в нем нету необходимого суффикса.

P.S. код можно глянуть здесь.

Комментарии (0)