В предыдущих частях мы рассмотрели вопросы мониторинга потоков данных и состояния системы средствами GUI NiFi и задач отчетности. В этом материале поближе познакомимся с задачами отчетности Site-to-Site. При отправке данных из одного экземпляра NiFi в другой можно использовать множество различных протоколов, однако, предпочтительным является NiFi Site-to-Site. Данный протокол предлагает безопасную и эффективную передачу данных из узлов в одном экземпляре NiFi, производящем данные, на узлы в другом экземпляре, являющимся приемником этих данных.

  1. SiteToSiteBulletinReportingTask – публикует события об ошибках и предупреждениях в бюллетенях с использованием протокола Site-to-Site;

  2. SiteToSiteProvenanceReportingTask – публикует события о происхождении данных, используя протокол Site-to-Site;

  3. SiteToSiteStatusReportingTask – публикует события состояния, используя протокол Site-to-Site;

  4. SiteToSiteMetricsReportingTask публикует те же метрики, что и задача Ambari Reporting, используя протокол Site-to-Site.

В NiFi есть возможность использовать Site-to-Site (S2S), а это означает, что задачи отчетности постоянно выполняются для сбора данных и их отправки в удаленный экземпляр NiFi. Аналогично можно использовать S2S для отправки данных в экземпляр, на котором выполняется отчетность.

Таким образом, используя входной порт на рабочей области, вы можете фактически получать данные, сгенерированные задачей создания отчетов, и применять их в рабочем процессе NiFi. Эта функциональность позволяет использовать все возможности NiFi для обработки метрик задач отчетности.

Настройка Site-to-Site

Прежде чем можно будет использовать S2S, необходимо установить следующие свойства в файле nifi.properties на всех узлах в вашем кластере NiFi:

1)     Свойства с именем nifi.remote.input.socket.* зависят от транспортного протокола RAW;

2)     Свойства nifi.remote.input.http.* являются специфическими свойствами транспортного протокола HTTP.

Свойство

Примечание

nifi.remote.input.host=<FQDN of Host>

Установить полное доменное имя для всех узлов

nifi.remote.input.secure=false

Установите значение true, если NiFi использует HTTPS. По умолчанию установлено значение false. Другие свойства безопасности (ниже) также должны быть настроены

nifi.remote.input.socket.port=<порт для S2S>)

Порт для связи Site-to-Site. По умолчанию он пуст, но должен иметь значение, чтобы использовать сокет RAW в качестве транспортного протокола для Site-to-Site

nifi.remote.input.http.enabled=true               

Указывает, следует ли включить HTTP Site-to-Site на этом узле. По умолчанию установлено значение true.

Использование клиентом Site-to-Site HTTP или HTTPS определяется nifi.remote.input.secure. Если установлено значение true, то запросы отправляются как HTTPS на nifi.web.https.port. Если установлено значение false, HTTP-запросы отправляются на nifi.web.http.port

nifi.remote.input.http.transaction.ttl=30 sec

Указывает, как долго транзакция может оставаться на сервере. По умолчанию установлено значение 30 секунд

Чтобы эти изменения вступили в силу, потребуется перезапуск ваших экземпляров NiFi.

SiteToSiteBulletinReportingTask

Публикует события бюллетеня, используя протокол Site-to-Site. Для каждого компонента сохраняется пять-десять сводок на уровне контроллера продолжительностью до пяти минут. Если отчетная задача запланирована недостаточно часто, некоторые бюллетени могут не отправляться.

Рис. 1. Свойства задачи отчетности SiteToSiteBulletinReportingTask
Рис. 1. Свойства задачи отчетности SiteToSiteBulletinReportingTask

Свойство

Значение по умолчанию

Допустимые значения

Описание

Destination URL

 

 

URL-адрес целевого экземпляра NiFi для отправки данных должен быть в формате http(s)://host:port/nifi

Input Port Name

 

 

Имя входного порта для доставки событий происхождения

SSL Context Service

 

Controller Service API:
SSLContextService

Implementation: StandardSSLContextService

Контекстная служба SSL для использования при обмене данными с целевым сервером.

Если не указано, то не используется SSL протокол

Instance URL

http://${hostname(true)}:8080/nifi

 

URL этого экземпляра для использования в URI контента каждого события

Compress Events

true

* true

* false

Указывает, следует ли сжимать данные событий при отправке

Communications Timeout

30 secs

 

Указывает, как долго ждать ответа от адресата, прежде чем решить, что произошла ошибка, и отменить транзакцию

Transport Protocol

RAW

* RAW

* HTTP

Указывает, какой транспортный протокол использовать для связи Site-to-Site

HTTP Proxy hostname

 

 

Укажите имя хоста прокси-сервера для использования. Если не указано, HTTP-трафик отправляется непосредственно на целевой экземпляр NiFi

HTTP Proxy port

 

 

Укажите номер порта прокси-сервера, необязательно. Если не указано, будет использоваться порт 80 по умолчанию

HTTP Proxy username

 

 

Укажите имя пользователя для подключения к прокси-серверу (опционально)

HTTP Proxy password

 

 

Укажите пароль пользователя для подключения к прокси-серверу (опционально)

Record Writer

 

Controller Service API:
RecordSetWriterFactory
Implementations: XMLRecordSetWriter
ParquetRecordSetWriter
RecordSetWriterLookup
FreeFormTextRecordSetWriter
AvroRecordSetWriter
CSVRecordSetWriter
JsonRecordSetWriter
ScriptedRecordSetWriter

Указывает службу контроллера, используемую для записи записей

Include Null Values

false

* true

* false

Укажите, следует ли включать null значения в записи. По умолчанию будет не включены

Platform

nifi

 

Значение, используемое для поля платформы в каждом событии

Пользователь может определить Record Writer и напрямую указать выходной формат и формат данных, используя следующую входную схему:

{
  "type" : "record",
  "name" : "bulletins",
  "namespace" : "bulletins",
  "fields" : [ 
	{ "name" : "objectId", "type" : "string" },
	{ "name" : "platform", "type" : "string" },
	{ "name" : "bulletinId", "type" : "long" },
	{ "name" : "bulletinCategory", "type" : ["string", "null"] },
	{ "name" : "bulletinGroupId", "type" : ["string", "null"] },
	{ "name" : "bulletinGroupName", "type" : ["string", "null"] },
	{ "name" : "bulletinGroupPath", "type" : ["string", "null"] },
	{ "name" : "bulletinLevel", "type" : ["string", "null"] },
	{ "name" : "bulletinMessage", "type" : ["string", "null"] },
	{ "name" : "bulletinNodeAddress", "type" : ["string", "null"] },
	{ "name" : "bulletinNodeId", "type" : ["string", "null"] },
	{ "name" : "bulletinSourceId", "type" : ["string", "null"] },
	{ "name" : "bulletinSourceName", "type" : ["string", "null"] },
	{ "name" : "bulletinSourceType", "type" : ["string", "null"] },
	{ "name" : "bulletinTimestamp", "type" : ["string", "null"], "doc" : "Format: yyyy-MM-dd'T'HH:mm:ss.SSS'Z'" }
  ]
}

Для хранения статистики по ошибкам создадим таблицу в БД PostgreSQL:

CREATE TABLE nifi_bulletin_err_log (
  id                 BIGSERIAL
, flowId             VARCHAR(36)  
, bulletinGroupId    VARCHAR(36)
, bulletinGroupName  VARCHAR(255)
, bulletinNodeId     VARCHAR(36)
, bulletinSourceId   VARCHAR(36)
, bulletinSourceName VARCHAR(255)  
, bulletinSourceType VARCHAR(50)
, bulletinMessage    TEXT
, bulletinTimestamp  VARCHAR(30)
, logTimestamp       TIMESTAMP DEFAULT LOCALTIMESTAMP(0)
, CONSTRAINT nifi_bulletin_err_log_pkey PRIMARY KEY (id));

И настроим блок обработки и записи метрик, получаемых на удаленный порт NiFi:

Рис. 2. Обработка событий задачи отчетности SiteToSiteBulletinReportingTask
Рис. 2. Обработка событий задачи отчетности SiteToSiteBulletinReportingTask

В Grafana создадим источник для работы с базой PostgreSQL:

Рис. 3. Настройка источника PostgreSQL в Grafana
Рис. 3. Настройка источника PostgreSQL в Grafana

И на основе созданной таблицы построим дашборд для мониторинга ошибок NiFi в Grafana:

Рис. 4. Пример дашборда для обработки отчетов Bulletin Board в Grafana
Рис. 4. Пример дашборда для обработки отчетов Bulletin Board в Grafana

SiteToSiteProvenanceReportingTask

Задача создания отчетов о происхождении данных позволяет пользователю публиковать все события происхождения из экземпляра NiFi обратно в тот же экземпляр NiFi или другой его экземпляр. Это обеспечивает большую гибкость, поскольку можно использовать все возможные процессоры, доступные в NiFi, для обработки или распространения этих данных.

Рекомендуется отправлять данные о происхождении в экземпляр NiFi, отличный от того, на котором выполняется задача создания отчетов SiteToSiteProvenanceReportingTask, потому что, когда данные получены через Site-to-Site и обработаны, это само по себе будет генерировать Provenance события. В результате создается цикл. Необходимо учесть, что данные отправляются пакетами (по умолчанию – 1000). Это означает, что для каждой партии событий Provenance, которые отправляются обратно в NiFi, получатель NiFi должен будет генерировать только одно событие для каждого компонента.

По умолчанию при публикации в экземпляре NiFi данные Provenance отправляются в виде массива JSON. Однако, пользователь может определить Record Writer и напрямую указать выходной формат и данные, предполагая, что входная схема определена следующим образом:

{
  "type" : "record",
  "name" : "provenance",
  "namespace" : "provenance",
  "fields": [
    { "name": "eventId", "type": "string" },
    { "name": "eventOrdinal", "type": "long" },
    { "name": "eventType", "type": "string" },
    { "name": "timestampMillis", "type": "long" },
    { "name": "durationMillis", "type": "long" },
    { "name": "lineageStart", "type": { "type": "long", "logicalType": "timestamp-millis" } },
    { "name": "details", "type": ["null", "string"] },
    { "name": "componentId", "type": ["null", "string"] },
    { "name": "componentType", "type": ["null", "string"] },
    { "name": "componentName", "type": ["null", "string"] },
    { "name": "processGroupId", "type": ["null", "string"] },
    { "name": "processGroupName", "type": ["null", "string"] },
    { "name": "entityId", "type": ["null", "string"] },
    { "name": "entityType", "type": ["null", "string"] },
    { "name": "entitySize", "type": ["null", "long"] },
    { "name": "previousEntitySize", "type": ["null", "long"] },
    { "name": "updatedAttributes", "type": { "type": "map", "values": "string" } },
    { "name": "previousAttributes", "type": { "type": "map", "values": "string" } },
    { "name": "actorHostname", "type": ["null", "string"] },
    { "name": "contentURI", "type": ["null", "string"] },
    { "name": "previousContentURI", "type": ["null", "string"] },
    { "name": "parentIds", "type": { "type": "array", "items": "string" } },
    { "name": "childIds", "type": { "type": "array", "items": "string" } },
    { "name": "platform", "type": "string" },
    { "name": "application", "type": "string" },
    { "name": "remoteIdentifier", "type": ["null", "string"] },
    { "name": "alternateIdentifier", "type": ["null", "string"] },
    { "name": "transitUri", "type": ["null", "string"] }
  ]
}

В задаче отчетности можно настроить следующие свойства:

Свойство

Значение по умолчанию

Допустимые значения

Описание

Destination URL

 

 

URL-адрес целевого экземпляра NiFi для отправки данных должен быть в формате http(s)://host:port/nifi

Этот URL-адрес будет использоваться только для инициирования соединения Site-to-Site. Данные, отправляемые этой задачей создания отчетов, будут сбалансированы по нагрузке на всех узлах назначения (если они кластеризованы)

Input Port Name

 

 

Имя входного порта для доставки событий

SSL Context Service

 

Controller Service API:
SSLContextService

Implementation:
StandardSSLContextService

Контекстная служба SSL для использования при обмене данными с пунктом назначения. Если не указано, связь не будет защищена

Instance URL

http://${hostname(true)}:8080/nifi

 

URL этого экземпляра для использования в URI контента каждого события

Compress Events

true

* true

* false

Указывает, следует ли сжимать данные событий при отправке

Communications Timeout

30 secs

 

Указывает, как долго ждать ответа от адресата, прежде чем решить, что произошла ошибка, и отменить транзакцию

Batch Size

1000

 

Указывает максимальное количество записей для отправки в одном пакете

Transport Protocol

RAW

* RAW

* HTTP

Указывает, какой транспортный протокол использовать для связи Site-to-Site

HTTP Proxy hostname

 

 

Укажите имя хоста прокси-сервера для использования. Если не указано, HTTP-трафик отправляется непосредственно на целевой экземпляр NiFi

HTTP Proxy port

 

 

Укажите номер порта прокси-сервера, необязательно. Если не указано, будет использоваться порт 80 по умолчанию

HTTP Proxy username

 

 

Укажите имя пользователя для подключения к прокси-серверу (опционально)

HTTP Proxy password

 

 

Укажите пароль пользователя для подключения к прокси-серверу (опционально)

Record Writer

 

Controller Service API:
RecordSetWriterFactory
Implementations: XMLRecordSetWriter
ParquetRecordSetWriter
RecordSetWriterLookup
FreeFormTextRecordSetWriter
AvroRecordSetWriter
CSVRecordSetWriter
JsonRecordSetWriter
ScriptedRecordSetWriter

Указывает службу контроллера, используемую для записи записей

Include Null Values

false

* true

* false

Укажите, следует ли включать null значения в записи. По умолчанию будет не включены

Platform

nifi

 

Значение, используемое для поля платформы в каждом событии

Event Type to Include

 

 

Разделенный запятыми список типов событий, которые будут использоваться для фильтрации событий происхождения, отправляемых задачей создания отчетов. Доступные типы событий: [CREATE, RECEIVE, FETCH, SEND, REMOTE_INVOCATION, DOWNLOAD, DROP, EXPIRE, FORK, JOIN, CLONE, CONTENT_MODIFIED, ATTRIBUTES_MODIFIED, ROUTE, ADDINFO, REPLAY, UNKNOWN]. Если фильтр не установлен, отправляются все события. Если установлено несколько фильтров, фильтры суммируются

Event Type to Exclude

 

 

Разделенный запятыми список типов событий, которые будут использоваться для исключения событий происхождения, отправленных задачей создания отчетов. Доступные типы событий: [CREATE, RECEIVE, FETCH, SEND, REMOTE_INVOCATION, DOWNLOAD, DROP, EXPIRE, FORK, JOIN, CLONE, CONTENT_MODIFIED, ATTRIBUTES_MODIFIED, ROUTE, ADDINFO, REPLAY, UNKNOWN]. Если фильтр не установлен, отправляются все события. Если установлено несколько фильтров, они суммируются. Если тип события включен в тип события для включения и исключен здесь, то исключение имеет приоритет и событие не будет отправлено

Component Type to Include

 

 

Регулярное выражение для фильтрации событий происхождения на основе типа компонента. Будут отправлены только события, соответствующие регулярному выражению. Если фильтр не установлен, отправляются все события. Если установлено несколько фильтров, фильтры суммируются

Component Type to Exclude

 

 

Регулярное выражение для исключения событий происхождения на основе типа компонента. События, соответствующие регулярному выражению, не будут отправлены. Если фильтр не установлен, отправляются все события. Если установлено несколько фильтров, фильтры суммируются. Если тип компонента включен в тип компонента для включения и исключен здесь, то исключение имеет приоритет и событие не будет отправлено

Component ID to Include

 

 

Разделенный запятыми список UUID компонента, который будет использоваться для фильтрации событий происхождения, отправляемых задачей создания отчетов. Если фильтр не установлен, отправляются все события. Если установлено несколько фильтров, фильтры суммируются

Component ID to Exclude

 

 

Разделенный запятыми список UUID компонента, который будет использоваться для исключения событий происхождения, отправленных задачей создания отчетов. Если фильтр не установлен, отправляются все события. Если установлено несколько фильтров, фильтры суммируются. Если UUID компонента включен в идентификатор компонента для включения и исключен здесь, то исключение имеет приоритет и событие не будет отправлено

Component Name to Include

 

 

Регулярное выражение для фильтрации событий происхождения на основе имени компонента. Будут отправлены только события, соответствующие регулярному выражению. Если фильтр не установлен, отправляются все события. Если установлено несколько фильтров, фильтры суммируются

Component Name to Exclude

 

 

Регулярное выражение для исключения событий происхождения на основе имени компонента. События, соответствующие регулярному выражению, не будут отправлены. Если фильтр не установлен, отправляются все события. Если установлено несколько фильтров, фильтры суммируются. Если имя компонента включено в имя компонента для включения и исключено здесь, то исключение имеет приоритет и событие не будет отправлено

Start Position

beginning-of-stream

* Beginning of Stream

* End of Stream

Если задача создания отчетов никогда не запускалась или ее состояние было сброшено пользователем, указывает, где в потоке событий происхождения должна начаться задача создания отчетов

SiteToSiteStatusReportingTask

Публикует события состояния, используя протокол Site-to-Site.

При задании регулярных выражений для типа компонента и фильтра имени, будут представлены только компоненты, соответствующие обоим регулярным выражениям. Однако, все группы процессов рекурсивно ищут совпадающие компоненты независимо от того, соответствует ли группа процессов фильтрам компонентов.

В задаче отчетности можно настроить следующие свойства:

Свойство

Значение по умолчанию

Допустимые значения

Описание

Destination URL

 

 

URL-адрес целевого экземпляра NiFi для отправки данных должен быть в формате http(s)://host:port/nifi

Этот URL-адрес будет использоваться только для инициирования соединения Site-to-Site. Данные, отправляемые этой задачей создания отчетов, будут сбалансированы по нагрузке на всех узлах назначения (если они кластеризованы)

Input Port Name

 

 

Имя входного порта для доставки событий

SSL Context Service

 

Controller Service API:
SSLContextService

Implementation:
StandardSSLContextService

Контекстная служба SSL для использования при обмене данными с пунктом назначения. Если не указано, связь не будет защищена

Instance URL

http://${hostname(true)}:8080/nifi

 

URL этого экземпляра для использования в URI контента каждого события

Compress Events

true

* true

* false

Указывает, следует ли сжимать события при отправке

Communications Timeout

30 secs

 

Указывает, как долго ждать ответа от адресата, прежде чем решить, что произошла ошибка, и отменить транзакцию

Batch Size

1000

 

Указывает максимальное количество записей для отправки в одном пакете

Transport Protocol

RAW

* RAW

* HTTP

Указывает, какой транспортный протокол использовать для связи Site-to-Site

HTTP Proxy hostname

 

 

Укажите имя хоста прокси-сервера для использования. Если не указано, HTTP-трафик отправляется непосредственно на целевой экземпляр NiFi

HTTP Proxy port

 

 

Укажите номер порта прокси-сервера, необязательно. Если не указано, будет использоваться порт 80 по умолчанию

HTTP Proxy username

 

 

Укажите имя пользователя для подключения к прокси-серверу (опционально)

HTTP Proxy password

 

 

Укажите пароль пользователя для подключения к прокси-серверу (опционально)

Record Writer

 

Controller Service API:
RecordSetWriterFactory
Implementations: XMLRecordSetWriter
ParquetRecordSetWriter
RecordSetWriterLookup
FreeFormTextRecordSetWriter
AvroRecordSetWriter
CSVRecordSetWriter
JsonRecordSetWriter
ScriptedRecordSetWriter

Указывает службу контроллера, используемую для записи записей

Include Null Values

false

* true

* false

Укажите, следует ли включать null значения в записи. По умолчанию будет не включены

Platform

nifi

 

Значение, используемое для поля платформы в каждом событии

Component Type Filter Regex

(Processor|ProcessGroup|RemoteProcessGroup|RootProcessGroup|Connection|InputPort|OutputPort)

 

Регулярное выражение, определяющее типы компонентов для отчета. Будет включен любой тип компонента, соответствующий этому регулярному выражению. Типы компонентов: Processor, RootProcessGroup, ProcessGroup, RemoteProcessGroup, Connection, InputPort, OutputPort

Component Name Filter Regex

.*

 

Регулярное выражение, указывающее имена компонентов для отчета. Любое имя компонента, соответствующее этому регулярному выражению, будет включено

Пользователь может определить Record Writer, напрямую указать формат вывода и данные, предполагая, что схема ввода следующая:

{
  "type" : "record",
  "name" : "status",
  "namespace" : "status",
  "fields" : [
    // common fields for all components
	{ "name" : "statusId", "type" : "string"},
	{ "name" : "timestampMillis", "type": { "type": "long", "logicalType": "timestamp-millis" } },
	{ "name" : "timestamp", "type" : "string"},
	{ "name" : "actorHostname", "type" : "string"},
	{ "name" : "componentType", "type" : "string"},
	{ "name" : "componentName", "type" : "string"},
	{ "name" : "parentId", "type" : ["string", "null"]},
	{ "name" : "parentName", "type" : ["string", "null"]},
	{ "name" : "parentPath", "type" : ["string", "null"]},
	{ "name" : "platform", "type" : "string"},
	{ "name" : "application", "type" : "string"},
	{ "name" : "componentId", "type" : "string"},
	
	// PG + RPG + Ports + Processors
	{ "name" : "activeThreadCount", "type" : ["long", "null"]},
	
	// PG + Ports + Processors
	{ "name" : "flowFilesReceived", "type" : ["long", "null"]},
	{ "name" : "flowFilesSent", "type" : ["long", "null"]},
	
	// PG + Ports + Processors
	{ "name" : "bytesReceived", "type" : ["long", "null"]},
	{ "name" : "bytesSent", "type" : ["long", "null"]},
	
	// PG + Connections
	{ "name" : "queuedCount", "type" : ["long", "null"]},
	
	// PG + Processors
	{ "name" : "bytesRead", "type" : ["long", "null"]},
	{ "name" : "bytesWritten", "type" : ["long", "null"]},
	{ "name" : "terminatedThreadCount", "type" : ["long", "null"]},

	// Processors + Ports
	{ "name" : "runStatus", "type" : ["string", "null"]},

	// fields for process group status
	{ "name" : "bytesTransferred", "type" : ["long", "null"]},
	{ "name" : "flowFilesTransferred", "type" : ["long", "null"]},
	{ "name" : "inputContentSize", "type" : ["long", "null"]},
	{ "name" : "outputContentSize", "type" : ["long", "null"]},
	{ "name" : "queuedContentSize", "type" : ["long", "null"]},
	{ "name" : "versionedFlowState", "type" : ["string", "null"]},
	
	// fields for remote process groups
	{ "name" : "activeRemotePortCount", "type" : ["long", "null"]},
	{ "name" : "inactiveRemotePortCount", "type" : ["long", "null"]},
	{ "name" : "receivedContentSize", "type" : ["long", "null"]},
	{ "name" : "receivedCount", "type" : ["long", "null"]},
	{ "name" : "sentContentSize", "type" : ["long", "null"]},
	{ "name" : "sentCount", "type" : ["long", "null"]},
	{ "name" : "averageLineageDuration", "type" : ["long", "null"]},
	{ "name" : "transmissionStatus", "type" : ["string", "null"]},
	{ "name" : "targetURI", "type" : ["string", "null"]},
	
	// fields for input/output ports + connections + PG
	{ "name" : "inputBytes", "type" : ["long", "null"]},
	{ "name" : "inputCount", "type" : ["long", "null"]},
	{ "name" : "outputBytes", "type" : ["long", "null"]},
	{ "name" : "outputCount", "type" : ["long", "null"]},
	{ "name" : "transmitting", "type" : ["boolean", "null"]},
	
	// fields for connections
	{ "name" : "sourceId", "type" : ["string", "null"]},
	{ "name" : "sourceName", "type" : ["string", "null"]},
	{ "name" : "destinationId", "type" : ["string", "null"]},
	{ "name" : "destinationName", "type" : ["string", "null"]},
	{ "name" : "maxQueuedBytes", "type" : ["long", "null"]},
	{ "name" : "maxQueuedCount", "type" : ["long", "null"]},
	{ "name" : "queuedBytes", "type" : ["long", "null"]},
	{ "name" : "backPressureBytesThreshold", "type" : ["long", "null"]},
	{ "name" : "backPressureObjectThreshold", "type" : ["long", "null"]},
	{ "name" : "backPressureDataSizeThreshold", "type" : ["string", "null"]},
	{ "name" : "isBackPressureEnabled", "type" : ["string", "null"]},
	
    // fields for processors
	{ "name" : "processorType", "type" : ["string", "null"]},
	{ "name" : "averageLineageDurationMS", "type" : ["long", "null"]},
	{ "name" : "flowFilesRemoved", "type" : ["long", "null"]},
	{ "name" : "invocations", "type" : ["long", "null"]},
	{ "name" : "processingNanos", "type" : ["long", "null"]},
	{ "name" : "executionNode", "type" : ["string", "null"]},
	{ "name" : "counters", "type": ["null", { "type": "map", "values": "string" }]}
  ]
}

SiteToSiteMetricsReportingTask

Публикует те же метрики, что и задача Ambari Reporting, используя протокол Site-To-Site:

Свойство

Значение по умолчанию

Допустимые значения

Описание

Destination URL

 

 

URL-адрес целевого экземпляра NiFi или, если он кластеризован, список адресов, разделенных запятыми, в формате http(s)://host:port/nifi. Этот URL-адрес назначения будет использоваться только для инициирования соединения Site-to-Site. Данные, отправляемые этой задачей создания отчетов, будут сбалансированы по нагрузке на всех узлах назначения (если они кластеризованы)

Input Port Name

 

 

Имя входного порта для доставки событий

SSL Context Service

 

Controller Service API:
SSLContextService

Implementation:
StandardSSLContextService

Контекстная служба SSL для использования при обмене данными с пунктом назначения. Если не указано, связь не будет защищена

Instance URL

http://${hostname(true)}:8080/nifi

 

URL этого экземпляра для использования в URI контента каждого события

Compress Events

true

* true

* false

Указывает, следует ли сжимать события при отправке

Communications Timeout

30 secs

 

Указывает, как долго ждать ответа от адресата, прежде чем решить, что произошла ошибка, и отменить транзакцию

Batch Size

1000

 

Указывает максимальное количество записей для отправки в одном пакете

Transport Protocol

RAW

* RAW

* HTTP

Указывает, какой транспортный протокол использовать для связи Site-to-Site

HTTP Proxy hostname

 

 

Укажите имя хоста прокси-сервера для использования. Если не указано, HTTP-трафик отправляется непосредственно на целевой экземпляр NiFi

HTTP Proxy port

 

 

Укажите номер порта прокси-сервера, необязательно. Если не указано, будет использоваться порт 80 по умолчанию

HTTP Proxy username

 

 

Укажите имя пользователя для подключения к прокси-серверу (опционально)

HTTP Proxy password

 

 

Укажите пароль пользователя для подключения к прокси-серверу (опционально)

Record Writer

 

Controller Service API:
RecordSetWriterFactory
Implementations: XMLRecordSetWriter
ParquetRecordSetWriter
RecordSetWriterLookup
FreeFormTextRecordSetWriter
AvroRecordSetWriter
CSVRecordSetWriter
JsonRecordSetWriter
ScriptedRecordSetWriter

Указывает службу контроллера, используемую для записи записей

Include Null Values

false

* true

* false

Укажите, следует ли включать null значения в записи. По умолчанию будет не включены

Hostname

${hostname(true)}

 

Имя хоста этого экземпляра NiFi, которое будет включено в метрики

Application ID

nifi

 

Идентификатор приложения, который будет включен в метрики

Output Format

ambari-format

* Ambari Format

* Record Format

Выходной формат, который будет использоваться для метрик. Если выбран Record Format, необходимо выбрать Record Writer. Если выбран формат Ambari, свойство Record Writer должно быть пустым

Рис. 5.  Пример дашборда в Grafana SiteToSiteMetricsReportingTask
Рис. 5. Пример дашборда в Grafana SiteToSiteMetricsReportingTask

Есть два доступных формата вывода. Первый — это формат Ambari, определенный в API-интерфейсе сборщика метрик Ambari, который представляет собой JSON с динамическими ключами. При использовании этого формата вас может заинтересовать приведенная ниже спецификация Jolt для преобразования данных.

Формат Ambari

			[
			  {
			    "operation": "shift",
			    "spec": {
			      "metrics": {
			        "*": {
			          "metrics": {
			            "*": {
			              "$": "metrics.[#4].metrics.time",
			              "@": "metrics.[#4].metrics.value"
			            }
			          },
			          "*": "metrics.[&1].&"
			        }
			      }
			    }
			  }
			]

Схема преобразует приведенный ниже образец:

{
				"metrics": [{
					"metricname": "jvm.gc.time.G1OldGeneration",
					"appid": "nifi",
					"instanceid": "8927f4c0-0160-1000-597a-ea764ccd81a7",
					"hostname": "localhost",
					"timestamp": "1520456854361",
					"starttime": "1520456854361",
					"metrics": {
						"1520456854361": "0"
					}
				}, {
					"metricname": "jvm.thread_states.terminated",
					"appid": "nifi",
					"instanceid": "8927f4c0-0160-1000-597a-ea764ccd81a7",
					"hostname": "localhost",
					"timestamp": "1520456854361",
					"starttime": "1520456854361",
					"metrics": {
						"1520456854361": "0"
					}
				}]
			}

к следующему виду:

{
				"metrics": [{
					"metricname": "jvm.gc.time.G1OldGeneration",
					"appid": "nifi",
					"instanceid": "8927f4c0-0160-1000-597a-ea764ccd81a7",
					"hostname": "localhost",
					"timestamp": "1520456854361",
					"starttime": "1520456854361",
					"metrics": {
						"time": "1520456854361",
						"value": "0"
					}
				}, {
					"metricname": "jvm.thread_states.terminated",
					"appid": "nifi",
					"instanceid": "8927f4c0-0160-1000-597a-ea764ccd81a7",
					"hostname": "localhost",
					"timestamp": "1520456854361",
					"starttime": "1520456854361",
					"metrics": {
						"time": "1520456854361",
						"value": "0"
					}
				}]
			}

Record format

Второй формат использует структуру записи NiFi, чтобы пользователь мог определить Record Writer и напрямую указать формат вывода и данные, предполагая, что схема ввода следующая:

{
			  "type" : "record",
			  "name" : "metrics",
			  "namespace" : "metrics",
			  "fields" : [ 
				{ "name" : "appid", "type" : "string" },
				{ "name" : "instanceid", "type" : "string" },
				{ "name" : "hostname", "type" : "string" },
				{ "name" : "timestamp", "type" : "long" },
				{ "name" : "loadAverage1min", "type" : "double" },
				{ "name" : "availableCores", "type" : "int" },
				{ "name" : "FlowFilesReceivedLast5Minutes", "type" : "int" },
				{ "name" : "BytesReceivedLast5Minutes", "type" : "long" },
				{ "name" : "FlowFilesSentLast5Minutes", "type" : "int" },
				{ "name" : "BytesSentLast5Minutes", "type" : "long" },
				{ "name" : "FlowFilesQueued", "type" : "int" },
				{ "name" : "BytesQueued", "type" : "long" },
				{ "name" : "BytesReadLast5Minutes", "type" : "long" },
				{ "name" : "BytesWrittenLast5Minutes", "type" : "long" },
				{ "name" : "ActiveThreads", "type" : "int" },
				{ "name" : "TotalTaskDurationSeconds", "type" : "long" },
				{ "name" : "TotalTaskDurationNanoSeconds", "type" : "long" },
				{ "name" : "jvmuptime", "type" : "long" },
				{ "name" : "jvmheap_used", "type" : "double" },
				{ "name" : "jvmheap_usage", "type" : "double" },
				{ "name" : "jvmnon_heap_usage", "type" : "double" },
				{ "name" : "jvmthread_statesrunnable", "type" : ["int", "null"] },
				{ "name" : "jvmthread_statesblocked", "type" : ["int", "null"] },
				{ "name" : "jvmthread_statestimed_waiting", "type" : ["int", "null"] },
				{ "name" : "jvmthread_statesterminated", "type" : ["int", "null"] },
				{ "name" : "jvmthread_count", "type" : "int" },
				{ "name" : "jvmdaemon_thread_count", "type" : "int" },
				{ "name" : "jvmfile_descriptor_usage", "type" : "double" },
				{ "name" : "jvmgcruns", "type" : ["long", "null"] },
				{ "name" : "jvmgctime", "type" : ["long", "null"] }
			  ]
			}

Заключение

В этой статье мы познакомили вас с задачами отчетности Site-to-Site. Это удобный и гибкий механизм, позволяющий организовывать постоянный сбор данных и их отправку как в удаленный экземпляр NiFi, так и в экземпляр, выполняющий данную задачу отчетности. Таким образом, используя входной порт в рабочей области NiFi, вы можете фактически получать данные, сгенерированные задачей создания отчетов и использовать их в рабочем процессе NiFi. Это позволяет вам применять все возможности NiFi для обработки собранных метрик и дальнейшего использования результатов для визуализации и анализа.

Комментарии (0)