В предыдущих частях мы рассмотрели вопросы мониторинга потоков данных и состояния системы средствами GUI NiFi и задач отчетности. В этом материале поближе познакомимся с задачами отчетности Site-to-Site. При отправке данных из одного экземпляра NiFi в другой можно использовать множество различных протоколов, однако, предпочтительным является NiFi Site-to-Site. Данный протокол предлагает безопасную и эффективную передачу данных из узлов в одном экземпляре NiFi, производящем данные, на узлы в другом экземпляре, являющимся приемником этих данных.
SiteToSiteBulletinReportingTask – публикует события об ошибках и предупреждениях в бюллетенях с использованием протокола Site-to-Site;
SiteToSiteProvenanceReportingTask – публикует события о происхождении данных, используя протокол Site-to-Site;
SiteToSiteStatusReportingTask – публикует события состояния, используя протокол Site-to-Site;
SiteToSiteMetricsReportingTask – публикует те же метрики, что и задача Ambari Reporting, используя протокол Site-to-Site.
В NiFi есть возможность использовать Site-to-Site (S2S), а это означает, что задачи отчетности постоянно выполняются для сбора данных и их отправки в удаленный экземпляр NiFi. Аналогично можно использовать S2S для отправки данных в экземпляр, на котором выполняется отчетность.
Таким образом, используя входной порт на рабочей области, вы можете фактически получать данные, сгенерированные задачей создания отчетов, и применять их в рабочем процессе NiFi. Эта функциональность позволяет использовать все возможности NiFi для обработки метрик задач отчетности.
Настройка Site-to-Site
Прежде чем можно будет использовать S2S, необходимо установить следующие свойства в файле nifi.properties на всех узлах в вашем кластере NiFi:
1) Свойства с именем nifi.remote.input.socket.* зависят от транспортного протокола RAW;
2) Свойства nifi.remote.input.http.* являются специфическими свойствами транспортного протокола HTTP.
Свойство |
Примечание |
nifi.remote.input.host=<FQDN of Host> |
Установить полное доменное имя для всех узлов |
nifi.remote.input.secure=false |
Установите значение true, если NiFi использует HTTPS. По умолчанию установлено значение false. Другие свойства безопасности (ниже) также должны быть настроены |
nifi.remote.input.socket.port=<порт для S2S>) |
Порт для связи Site-to-Site. По умолчанию он пуст, но должен иметь значение, чтобы использовать сокет RAW в качестве транспортного протокола для Site-to-Site |
nifi.remote.input.http.enabled=true |
Указывает, следует ли включить HTTP Site-to-Site на этом узле. По умолчанию установлено значение true. Использование клиентом Site-to-Site HTTP или HTTPS определяется nifi.remote.input.secure. Если установлено значение true, то запросы отправляются как HTTPS на nifi.web.https.port. Если установлено значение false, HTTP-запросы отправляются на nifi.web.http.port |
nifi.remote.input.http.transaction.ttl=30 sec |
Указывает, как долго транзакция может оставаться на сервере. По умолчанию установлено значение 30 секунд |
Чтобы эти изменения вступили в силу, потребуется перезапуск ваших экземпляров NiFi.
SiteToSiteBulletinReportingTask
Публикует события бюллетеня, используя протокол Site-to-Site. Для каждого компонента сохраняется пять-десять сводок на уровне контроллера продолжительностью до пяти минут. Если отчетная задача запланирована недостаточно часто, некоторые бюллетени могут не отправляться.
Свойство |
Значение по умолчанию |
Допустимые значения |
Описание |
Destination URL |
|
|
URL-адрес целевого экземпляра NiFi для отправки данных должен быть в формате http(s)://host:port/nifi |
Input Port Name |
|
|
Имя входного порта для доставки событий происхождения |
SSL Context Service |
|
Controller Service API: Implementation: StandardSSLContextService |
Контекстная служба SSL для использования при обмене данными с целевым сервером. Если не указано, то не используется SSL протокол |
Instance URL |
http://${hostname(true)}:8080/nifi |
|
URL этого экземпляра для использования в URI контента каждого события |
Compress Events |
true |
* true * false |
Указывает, следует ли сжимать данные событий при отправке |
Communications Timeout |
30 secs |
|
Указывает, как долго ждать ответа от адресата, прежде чем решить, что произошла ошибка, и отменить транзакцию |
Transport Protocol |
RAW |
* RAW * HTTP |
Указывает, какой транспортный протокол использовать для связи Site-to-Site |
HTTP Proxy hostname |
|
|
Укажите имя хоста прокси-сервера для использования. Если не указано, HTTP-трафик отправляется непосредственно на целевой экземпляр NiFi |
HTTP Proxy port |
|
|
Укажите номер порта прокси-сервера, необязательно. Если не указано, будет использоваться порт 80 по умолчанию |
HTTP Proxy username |
|
|
Укажите имя пользователя для подключения к прокси-серверу (опционально) |
HTTP Proxy password |
|
|
Укажите пароль пользователя для подключения к прокси-серверу (опционально) |
Record Writer |
|
Controller Service API: |
Указывает службу контроллера, используемую для записи записей |
Include Null Values |
false |
* true * false |
Укажите, следует ли включать null значения в записи. По умолчанию будет не включены |
Platform |
nifi |
|
Значение, используемое для поля платформы в каждом событии |
Пользователь может определить Record Writer и напрямую указать выходной формат и формат данных, используя следующую входную схему:
{
"type" : "record",
"name" : "bulletins",
"namespace" : "bulletins",
"fields" : [
{ "name" : "objectId", "type" : "string" },
{ "name" : "platform", "type" : "string" },
{ "name" : "bulletinId", "type" : "long" },
{ "name" : "bulletinCategory", "type" : ["string", "null"] },
{ "name" : "bulletinGroupId", "type" : ["string", "null"] },
{ "name" : "bulletinGroupName", "type" : ["string", "null"] },
{ "name" : "bulletinGroupPath", "type" : ["string", "null"] },
{ "name" : "bulletinLevel", "type" : ["string", "null"] },
{ "name" : "bulletinMessage", "type" : ["string", "null"] },
{ "name" : "bulletinNodeAddress", "type" : ["string", "null"] },
{ "name" : "bulletinNodeId", "type" : ["string", "null"] },
{ "name" : "bulletinSourceId", "type" : ["string", "null"] },
{ "name" : "bulletinSourceName", "type" : ["string", "null"] },
{ "name" : "bulletinSourceType", "type" : ["string", "null"] },
{ "name" : "bulletinTimestamp", "type" : ["string", "null"], "doc" : "Format: yyyy-MM-dd'T'HH:mm:ss.SSS'Z'" }
]
}
Для хранения статистики по ошибкам создадим таблицу в БД PostgreSQL:
CREATE TABLE nifi_bulletin_err_log (
id BIGSERIAL
, flowId VARCHAR(36)
, bulletinGroupId VARCHAR(36)
, bulletinGroupName VARCHAR(255)
, bulletinNodeId VARCHAR(36)
, bulletinSourceId VARCHAR(36)
, bulletinSourceName VARCHAR(255)
, bulletinSourceType VARCHAR(50)
, bulletinMessage TEXT
, bulletinTimestamp VARCHAR(30)
, logTimestamp TIMESTAMP DEFAULT LOCALTIMESTAMP(0)
, CONSTRAINT nifi_bulletin_err_log_pkey PRIMARY KEY (id));
И настроим блок обработки и записи метрик, получаемых на удаленный порт NiFi:
В Grafana создадим источник для работы с базой PostgreSQL:
И на основе созданной таблицы построим дашборд для мониторинга ошибок NiFi в Grafana:
SiteToSiteProvenanceReportingTask
Задача создания отчетов о происхождении данных позволяет пользователю публиковать все события происхождения из экземпляра NiFi обратно в тот же экземпляр NiFi или другой его экземпляр. Это обеспечивает большую гибкость, поскольку можно использовать все возможные процессоры, доступные в NiFi, для обработки или распространения этих данных.
Рекомендуется отправлять данные о происхождении в экземпляр NiFi, отличный от того, на котором выполняется задача создания отчетов SiteToSiteProvenanceReportingTask, потому что, когда данные получены через Site-to-Site и обработаны, это само по себе будет генерировать Provenance события. В результате создается цикл. Необходимо учесть, что данные отправляются пакетами (по умолчанию – 1000). Это означает, что для каждой партии событий Provenance, которые отправляются обратно в NiFi, получатель NiFi должен будет генерировать только одно событие для каждого компонента.
По умолчанию при публикации в экземпляре NiFi данные Provenance отправляются в виде массива JSON. Однако, пользователь может определить Record Writer и напрямую указать выходной формат и данные, предполагая, что входная схема определена следующим образом:
{
"type" : "record",
"name" : "provenance",
"namespace" : "provenance",
"fields": [
{ "name": "eventId", "type": "string" },
{ "name": "eventOrdinal", "type": "long" },
{ "name": "eventType", "type": "string" },
{ "name": "timestampMillis", "type": "long" },
{ "name": "durationMillis", "type": "long" },
{ "name": "lineageStart", "type": { "type": "long", "logicalType": "timestamp-millis" } },
{ "name": "details", "type": ["null", "string"] },
{ "name": "componentId", "type": ["null", "string"] },
{ "name": "componentType", "type": ["null", "string"] },
{ "name": "componentName", "type": ["null", "string"] },
{ "name": "processGroupId", "type": ["null", "string"] },
{ "name": "processGroupName", "type": ["null", "string"] },
{ "name": "entityId", "type": ["null", "string"] },
{ "name": "entityType", "type": ["null", "string"] },
{ "name": "entitySize", "type": ["null", "long"] },
{ "name": "previousEntitySize", "type": ["null", "long"] },
{ "name": "updatedAttributes", "type": { "type": "map", "values": "string" } },
{ "name": "previousAttributes", "type": { "type": "map", "values": "string" } },
{ "name": "actorHostname", "type": ["null", "string"] },
{ "name": "contentURI", "type": ["null", "string"] },
{ "name": "previousContentURI", "type": ["null", "string"] },
{ "name": "parentIds", "type": { "type": "array", "items": "string" } },
{ "name": "childIds", "type": { "type": "array", "items": "string" } },
{ "name": "platform", "type": "string" },
{ "name": "application", "type": "string" },
{ "name": "remoteIdentifier", "type": ["null", "string"] },
{ "name": "alternateIdentifier", "type": ["null", "string"] },
{ "name": "transitUri", "type": ["null", "string"] }
]
}
В задаче отчетности можно настроить следующие свойства:
Свойство |
Значение по умолчанию |
Допустимые значения |
Описание |
Destination URL |
|
|
URL-адрес целевого экземпляра NiFi для отправки данных должен быть в формате http(s)://host:port/nifi Этот URL-адрес будет использоваться только для инициирования соединения Site-to-Site. Данные, отправляемые этой задачей создания отчетов, будут сбалансированы по нагрузке на всех узлах назначения (если они кластеризованы) |
Input Port Name |
|
|
Имя входного порта для доставки событий |
SSL Context Service |
|
Controller Service API: Implementation: |
Контекстная служба SSL для использования при обмене данными с пунктом назначения. Если не указано, связь не будет защищена |
Instance URL |
http://${hostname(true)}:8080/nifi |
|
URL этого экземпляра для использования в URI контента каждого события |
Compress Events |
true |
* true * false |
Указывает, следует ли сжимать данные событий при отправке |
Communications Timeout |
30 secs |
|
Указывает, как долго ждать ответа от адресата, прежде чем решить, что произошла ошибка, и отменить транзакцию |
Batch Size |
1000 |
|
Указывает максимальное количество записей для отправки в одном пакете |
Transport Protocol |
RAW |
* RAW * HTTP |
Указывает, какой транспортный протокол использовать для связи Site-to-Site |
HTTP Proxy hostname |
|
|
Укажите имя хоста прокси-сервера для использования. Если не указано, HTTP-трафик отправляется непосредственно на целевой экземпляр NiFi |
HTTP Proxy port |
|
|
Укажите номер порта прокси-сервера, необязательно. Если не указано, будет использоваться порт 80 по умолчанию |
HTTP Proxy username |
|
|
Укажите имя пользователя для подключения к прокси-серверу (опционально) |
HTTP Proxy password |
|
|
Укажите пароль пользователя для подключения к прокси-серверу (опционально) |
Record Writer |
|
Controller Service API: |
Указывает службу контроллера, используемую для записи записей |
Include Null Values |
false |
* true * false |
Укажите, следует ли включать null значения в записи. По умолчанию будет не включены |
Platform |
nifi |
|
Значение, используемое для поля платформы в каждом событии |
Event Type to Include |
|
|
Разделенный запятыми список типов событий, которые будут использоваться для фильтрации событий происхождения, отправляемых задачей создания отчетов. Доступные типы событий: [CREATE, RECEIVE, FETCH, SEND, REMOTE_INVOCATION, DOWNLOAD, DROP, EXPIRE, FORK, JOIN, CLONE, CONTENT_MODIFIED, ATTRIBUTES_MODIFIED, ROUTE, ADDINFO, REPLAY, UNKNOWN]. Если фильтр не установлен, отправляются все события. Если установлено несколько фильтров, фильтры суммируются |
Event Type to Exclude |
|
|
Разделенный запятыми список типов событий, которые будут использоваться для исключения событий происхождения, отправленных задачей создания отчетов. Доступные типы событий: [CREATE, RECEIVE, FETCH, SEND, REMOTE_INVOCATION, DOWNLOAD, DROP, EXPIRE, FORK, JOIN, CLONE, CONTENT_MODIFIED, ATTRIBUTES_MODIFIED, ROUTE, ADDINFO, REPLAY, UNKNOWN]. Если фильтр не установлен, отправляются все события. Если установлено несколько фильтров, они суммируются. Если тип события включен в тип события для включения и исключен здесь, то исключение имеет приоритет и событие не будет отправлено |
Component Type to Include |
|
|
Регулярное выражение для фильтрации событий происхождения на основе типа компонента. Будут отправлены только события, соответствующие регулярному выражению. Если фильтр не установлен, отправляются все события. Если установлено несколько фильтров, фильтры суммируются |
Component Type to Exclude |
|
|
Регулярное выражение для исключения событий происхождения на основе типа компонента. События, соответствующие регулярному выражению, не будут отправлены. Если фильтр не установлен, отправляются все события. Если установлено несколько фильтров, фильтры суммируются. Если тип компонента включен в тип компонента для включения и исключен здесь, то исключение имеет приоритет и событие не будет отправлено |
Component ID to Include |
|
|
Разделенный запятыми список UUID компонента, который будет использоваться для фильтрации событий происхождения, отправляемых задачей создания отчетов. Если фильтр не установлен, отправляются все события. Если установлено несколько фильтров, фильтры суммируются |
Component ID to Exclude |
|
|
Разделенный запятыми список UUID компонента, который будет использоваться для исключения событий происхождения, отправленных задачей создания отчетов. Если фильтр не установлен, отправляются все события. Если установлено несколько фильтров, фильтры суммируются. Если UUID компонента включен в идентификатор компонента для включения и исключен здесь, то исключение имеет приоритет и событие не будет отправлено |
Component Name to Include |
|
|
Регулярное выражение для фильтрации событий происхождения на основе имени компонента. Будут отправлены только события, соответствующие регулярному выражению. Если фильтр не установлен, отправляются все события. Если установлено несколько фильтров, фильтры суммируются |
Component Name to Exclude |
|
|
Регулярное выражение для исключения событий происхождения на основе имени компонента. События, соответствующие регулярному выражению, не будут отправлены. Если фильтр не установлен, отправляются все события. Если установлено несколько фильтров, фильтры суммируются. Если имя компонента включено в имя компонента для включения и исключено здесь, то исключение имеет приоритет и событие не будет отправлено |
Start Position |
beginning-of-stream |
* Beginning of Stream * End of Stream |
Если задача создания отчетов никогда не запускалась или ее состояние было сброшено пользователем, указывает, где в потоке событий происхождения должна начаться задача создания отчетов |
SiteToSiteStatusReportingTask
Публикует события состояния, используя протокол Site-to-Site.
При задании регулярных выражений для типа компонента и фильтра имени, будут представлены только компоненты, соответствующие обоим регулярным выражениям. Однако, все группы процессов рекурсивно ищут совпадающие компоненты независимо от того, соответствует ли группа процессов фильтрам компонентов.
В задаче отчетности можно настроить следующие свойства:
Свойство |
Значение по умолчанию |
Допустимые значения |
Описание |
Destination URL |
|
|
URL-адрес целевого экземпляра NiFi для отправки данных должен быть в формате http(s)://host:port/nifi Этот URL-адрес будет использоваться только для инициирования соединения Site-to-Site. Данные, отправляемые этой задачей создания отчетов, будут сбалансированы по нагрузке на всех узлах назначения (если они кластеризованы) |
Input Port Name |
|
|
Имя входного порта для доставки событий |
SSL Context Service |
|
Controller Service API: Implementation: |
Контекстная служба SSL для использования при обмене данными с пунктом назначения. Если не указано, связь не будет защищена |
Instance URL |
http://${hostname(true)}:8080/nifi |
|
URL этого экземпляра для использования в URI контента каждого события |
Compress Events |
true |
* true * false |
Указывает, следует ли сжимать события при отправке |
Communications Timeout |
30 secs |
|
Указывает, как долго ждать ответа от адресата, прежде чем решить, что произошла ошибка, и отменить транзакцию |
Batch Size |
1000 |
|
Указывает максимальное количество записей для отправки в одном пакете |
Transport Protocol |
RAW |
* RAW * HTTP |
Указывает, какой транспортный протокол использовать для связи Site-to-Site |
HTTP Proxy hostname |
|
|
Укажите имя хоста прокси-сервера для использования. Если не указано, HTTP-трафик отправляется непосредственно на целевой экземпляр NiFi |
HTTP Proxy port |
|
|
Укажите номер порта прокси-сервера, необязательно. Если не указано, будет использоваться порт 80 по умолчанию |
HTTP Proxy username |
|
|
Укажите имя пользователя для подключения к прокси-серверу (опционально) |
HTTP Proxy password |
|
|
Укажите пароль пользователя для подключения к прокси-серверу (опционально) |
Record Writer |
|
Controller Service API: |
Указывает службу контроллера, используемую для записи записей |
Include Null Values |
false |
* true * false |
Укажите, следует ли включать null значения в записи. По умолчанию будет не включены |
Platform |
nifi |
|
Значение, используемое для поля платформы в каждом событии |
Component Type Filter Regex |
(Processor|ProcessGroup|RemoteProcessGroup|RootProcessGroup|Connection|InputPort|OutputPort) |
|
Регулярное выражение, определяющее типы компонентов для отчета. Будет включен любой тип компонента, соответствующий этому регулярному выражению. Типы компонентов: Processor, RootProcessGroup, ProcessGroup, RemoteProcessGroup, Connection, InputPort, OutputPort |
Component Name Filter Regex |
.* |
|
Регулярное выражение, указывающее имена компонентов для отчета. Любое имя компонента, соответствующее этому регулярному выражению, будет включено |
Пользователь может определить Record Writer, напрямую указать формат вывода и данные, предполагая, что схема ввода следующая:
{
"type" : "record",
"name" : "status",
"namespace" : "status",
"fields" : [
// common fields for all components
{ "name" : "statusId", "type" : "string"},
{ "name" : "timestampMillis", "type": { "type": "long", "logicalType": "timestamp-millis" } },
{ "name" : "timestamp", "type" : "string"},
{ "name" : "actorHostname", "type" : "string"},
{ "name" : "componentType", "type" : "string"},
{ "name" : "componentName", "type" : "string"},
{ "name" : "parentId", "type" : ["string", "null"]},
{ "name" : "parentName", "type" : ["string", "null"]},
{ "name" : "parentPath", "type" : ["string", "null"]},
{ "name" : "platform", "type" : "string"},
{ "name" : "application", "type" : "string"},
{ "name" : "componentId", "type" : "string"},
// PG + RPG + Ports + Processors
{ "name" : "activeThreadCount", "type" : ["long", "null"]},
// PG + Ports + Processors
{ "name" : "flowFilesReceived", "type" : ["long", "null"]},
{ "name" : "flowFilesSent", "type" : ["long", "null"]},
// PG + Ports + Processors
{ "name" : "bytesReceived", "type" : ["long", "null"]},
{ "name" : "bytesSent", "type" : ["long", "null"]},
// PG + Connections
{ "name" : "queuedCount", "type" : ["long", "null"]},
// PG + Processors
{ "name" : "bytesRead", "type" : ["long", "null"]},
{ "name" : "bytesWritten", "type" : ["long", "null"]},
{ "name" : "terminatedThreadCount", "type" : ["long", "null"]},
// Processors + Ports
{ "name" : "runStatus", "type" : ["string", "null"]},
// fields for process group status
{ "name" : "bytesTransferred", "type" : ["long", "null"]},
{ "name" : "flowFilesTransferred", "type" : ["long", "null"]},
{ "name" : "inputContentSize", "type" : ["long", "null"]},
{ "name" : "outputContentSize", "type" : ["long", "null"]},
{ "name" : "queuedContentSize", "type" : ["long", "null"]},
{ "name" : "versionedFlowState", "type" : ["string", "null"]},
// fields for remote process groups
{ "name" : "activeRemotePortCount", "type" : ["long", "null"]},
{ "name" : "inactiveRemotePortCount", "type" : ["long", "null"]},
{ "name" : "receivedContentSize", "type" : ["long", "null"]},
{ "name" : "receivedCount", "type" : ["long", "null"]},
{ "name" : "sentContentSize", "type" : ["long", "null"]},
{ "name" : "sentCount", "type" : ["long", "null"]},
{ "name" : "averageLineageDuration", "type" : ["long", "null"]},
{ "name" : "transmissionStatus", "type" : ["string", "null"]},
{ "name" : "targetURI", "type" : ["string", "null"]},
// fields for input/output ports + connections + PG
{ "name" : "inputBytes", "type" : ["long", "null"]},
{ "name" : "inputCount", "type" : ["long", "null"]},
{ "name" : "outputBytes", "type" : ["long", "null"]},
{ "name" : "outputCount", "type" : ["long", "null"]},
{ "name" : "transmitting", "type" : ["boolean", "null"]},
// fields for connections
{ "name" : "sourceId", "type" : ["string", "null"]},
{ "name" : "sourceName", "type" : ["string", "null"]},
{ "name" : "destinationId", "type" : ["string", "null"]},
{ "name" : "destinationName", "type" : ["string", "null"]},
{ "name" : "maxQueuedBytes", "type" : ["long", "null"]},
{ "name" : "maxQueuedCount", "type" : ["long", "null"]},
{ "name" : "queuedBytes", "type" : ["long", "null"]},
{ "name" : "backPressureBytesThreshold", "type" : ["long", "null"]},
{ "name" : "backPressureObjectThreshold", "type" : ["long", "null"]},
{ "name" : "backPressureDataSizeThreshold", "type" : ["string", "null"]},
{ "name" : "isBackPressureEnabled", "type" : ["string", "null"]},
// fields for processors
{ "name" : "processorType", "type" : ["string", "null"]},
{ "name" : "averageLineageDurationMS", "type" : ["long", "null"]},
{ "name" : "flowFilesRemoved", "type" : ["long", "null"]},
{ "name" : "invocations", "type" : ["long", "null"]},
{ "name" : "processingNanos", "type" : ["long", "null"]},
{ "name" : "executionNode", "type" : ["string", "null"]},
{ "name" : "counters", "type": ["null", { "type": "map", "values": "string" }]}
]
}
SiteToSiteMetricsReportingTask
Публикует те же метрики, что и задача Ambari Reporting, используя протокол Site-To-Site:
Свойство |
Значение по умолчанию |
Допустимые значения |
Описание |
Destination URL |
|
|
URL-адрес целевого экземпляра NiFi или, если он кластеризован, список адресов, разделенных запятыми, в формате http(s)://host:port/nifi. Этот URL-адрес назначения будет использоваться только для инициирования соединения Site-to-Site. Данные, отправляемые этой задачей создания отчетов, будут сбалансированы по нагрузке на всех узлах назначения (если они кластеризованы) |
Input Port Name |
|
|
Имя входного порта для доставки событий |
SSL Context Service |
|
Controller Service API: Implementation: |
Контекстная служба SSL для использования при обмене данными с пунктом назначения. Если не указано, связь не будет защищена |
Instance URL |
http://${hostname(true)}:8080/nifi |
|
URL этого экземпляра для использования в URI контента каждого события |
Compress Events |
true |
* true * false |
Указывает, следует ли сжимать события при отправке |
Communications Timeout |
30 secs |
|
Указывает, как долго ждать ответа от адресата, прежде чем решить, что произошла ошибка, и отменить транзакцию |
Batch Size |
1000 |
|
Указывает максимальное количество записей для отправки в одном пакете |
Transport Protocol |
RAW |
* RAW * HTTP |
Указывает, какой транспортный протокол использовать для связи Site-to-Site |
HTTP Proxy hostname |
|
|
Укажите имя хоста прокси-сервера для использования. Если не указано, HTTP-трафик отправляется непосредственно на целевой экземпляр NiFi |
HTTP Proxy port |
|
|
Укажите номер порта прокси-сервера, необязательно. Если не указано, будет использоваться порт 80 по умолчанию |
HTTP Proxy username |
|
|
Укажите имя пользователя для подключения к прокси-серверу (опционально) |
HTTP Proxy password |
|
|
Укажите пароль пользователя для подключения к прокси-серверу (опционально) |
Record Writer |
|
Controller Service API: |
Указывает службу контроллера, используемую для записи записей |
Include Null Values |
false |
* true * false |
Укажите, следует ли включать null значения в записи. По умолчанию будет не включены |
Hostname |
${hostname(true)} |
|
Имя хоста этого экземпляра NiFi, которое будет включено в метрики |
Application ID |
nifi |
|
Идентификатор приложения, который будет включен в метрики |
Output Format |
ambari-format |
* Ambari Format * Record Format |
Выходной формат, который будет использоваться для метрик. Если выбран Record Format, необходимо выбрать Record Writer. Если выбран формат Ambari, свойство Record Writer должно быть пустым |
Есть два доступных формата вывода. Первый — это формат Ambari, определенный в API-интерфейсе сборщика метрик Ambari, который представляет собой JSON с динамическими ключами. При использовании этого формата вас может заинтересовать приведенная ниже спецификация Jolt для преобразования данных.
Формат Ambari
[
{
"operation": "shift",
"spec": {
"metrics": {
"*": {
"metrics": {
"*": {
"$": "metrics.[#4].metrics.time",
"@": "metrics.[#4].metrics.value"
}
},
"*": "metrics.[&1].&"
}
}
}
}
]
Схема преобразует приведенный ниже образец:
{
"metrics": [{
"metricname": "jvm.gc.time.G1OldGeneration",
"appid": "nifi",
"instanceid": "8927f4c0-0160-1000-597a-ea764ccd81a7",
"hostname": "localhost",
"timestamp": "1520456854361",
"starttime": "1520456854361",
"metrics": {
"1520456854361": "0"
}
}, {
"metricname": "jvm.thread_states.terminated",
"appid": "nifi",
"instanceid": "8927f4c0-0160-1000-597a-ea764ccd81a7",
"hostname": "localhost",
"timestamp": "1520456854361",
"starttime": "1520456854361",
"metrics": {
"1520456854361": "0"
}
}]
}
к следующему виду:
{
"metrics": [{
"metricname": "jvm.gc.time.G1OldGeneration",
"appid": "nifi",
"instanceid": "8927f4c0-0160-1000-597a-ea764ccd81a7",
"hostname": "localhost",
"timestamp": "1520456854361",
"starttime": "1520456854361",
"metrics": {
"time": "1520456854361",
"value": "0"
}
}, {
"metricname": "jvm.thread_states.terminated",
"appid": "nifi",
"instanceid": "8927f4c0-0160-1000-597a-ea764ccd81a7",
"hostname": "localhost",
"timestamp": "1520456854361",
"starttime": "1520456854361",
"metrics": {
"time": "1520456854361",
"value": "0"
}
}]
}
Record format
Второй формат использует структуру записи NiFi, чтобы пользователь мог определить Record Writer и напрямую указать формат вывода и данные, предполагая, что схема ввода следующая:
{
"type" : "record",
"name" : "metrics",
"namespace" : "metrics",
"fields" : [
{ "name" : "appid", "type" : "string" },
{ "name" : "instanceid", "type" : "string" },
{ "name" : "hostname", "type" : "string" },
{ "name" : "timestamp", "type" : "long" },
{ "name" : "loadAverage1min", "type" : "double" },
{ "name" : "availableCores", "type" : "int" },
{ "name" : "FlowFilesReceivedLast5Minutes", "type" : "int" },
{ "name" : "BytesReceivedLast5Minutes", "type" : "long" },
{ "name" : "FlowFilesSentLast5Minutes", "type" : "int" },
{ "name" : "BytesSentLast5Minutes", "type" : "long" },
{ "name" : "FlowFilesQueued", "type" : "int" },
{ "name" : "BytesQueued", "type" : "long" },
{ "name" : "BytesReadLast5Minutes", "type" : "long" },
{ "name" : "BytesWrittenLast5Minutes", "type" : "long" },
{ "name" : "ActiveThreads", "type" : "int" },
{ "name" : "TotalTaskDurationSeconds", "type" : "long" },
{ "name" : "TotalTaskDurationNanoSeconds", "type" : "long" },
{ "name" : "jvmuptime", "type" : "long" },
{ "name" : "jvmheap_used", "type" : "double" },
{ "name" : "jvmheap_usage", "type" : "double" },
{ "name" : "jvmnon_heap_usage", "type" : "double" },
{ "name" : "jvmthread_statesrunnable", "type" : ["int", "null"] },
{ "name" : "jvmthread_statesblocked", "type" : ["int", "null"] },
{ "name" : "jvmthread_statestimed_waiting", "type" : ["int", "null"] },
{ "name" : "jvmthread_statesterminated", "type" : ["int", "null"] },
{ "name" : "jvmthread_count", "type" : "int" },
{ "name" : "jvmdaemon_thread_count", "type" : "int" },
{ "name" : "jvmfile_descriptor_usage", "type" : "double" },
{ "name" : "jvmgcruns", "type" : ["long", "null"] },
{ "name" : "jvmgctime", "type" : ["long", "null"] }
]
}
Заключение
В этой статье мы познакомили вас с задачами отчетности Site-to-Site. Это удобный и гибкий механизм, позволяющий организовывать постоянный сбор данных и их отправку как в удаленный экземпляр NiFi, так и в экземпляр, выполняющий данную задачу отчетности. Таким образом, используя входной порт в рабочей области NiFi, вы можете фактически получать данные, сгенерированные задачей создания отчетов и использовать их в рабочем процессе NiFi. Это позволяет вам применять все возможности NiFi для обработки собранных метрик и дальнейшего использования результатов для визуализации и анализа.