Переносимость процессоров и паттерны

Вот и обещанная третья часть саги о том, что в NiFi можно делать и как это делать правильно, без претензий на истину в последней инстанции, конечно. Сегодня расскажу о переносимости процессоров и дам несколько паттернов для самых популярных задач на платформе ZIIoT. Если вдруг вам интересно почитать про оптимизацию схем и производительности в NiFi — велком в первую часть. Если мечтаете узнать больше о мониторинге, то вторая часть — must read. Только потом сюда не забудьте вернуться.

Переносимость процессоров

Хорошая группа процессоров – переносимая группа процессоров. Бывают, конечно, проектные кастомы, никому кроме разработчика (да и ему после сдачи проекта) не интересные, но даже их бывает необходимо переносить между dev-test-prod средами, и хорошо бы с минимальными накладными расходами. Вот несколько очевидных и не очень лайфхаков, как эту переносимость обеспечить.

  1. Первое, что нам поможет, – версионирование групп процессоров с помощью NiFi registry. Собственно, об этом инструменте уже достаточно написано. Используйте его – рекомендую.

  2. Следующий важный момент – это окружение скрипта. Прежде всего, все конфигурируемые параметры имеет смысл выносить в переменные. Это нужно хотя бы для того, чтобы в случае необходимости изменения (например, при переносе между полигонами) вносить все правки в одном месте с автоматическим перезапуском задействованных элементов. Дополнительно стоит явным образом обозначить перечень конфигурируемых таким образом сущностей на Label, например, как на Рис. 1.

Рисунок 17
Рисунок 1. Обозначение перечня конфигурируемых сущностей на Label

И вот тут есть два принципиально различных подхода к обеспечению переносимости.

Первый подход — все необходимые параметры задаются на уровне группы процессоров. Переносимость получается 100% – бери и неси, только не забудь поправить по месту! Но управляемость страдает. Представьте десяток подобным образом организованных групп процессоров, и вдруг у вас возникает необходимость изменить один параметр во всех. А для пущего счастья – каждый разработчик может обозвать переменную так, как ему в тот момент захотелось. Ужас, да?

Второй подход (за который я и выступаю) – максимально стандартизировать окружение NiFi на всех полигонах: определить стандартный набор переменных, задавать их с помощью ENV\Variable registry, поставлять в стандартном наборе стандартный же механизм получения токена для доступа к API и пр., а в группе процессоров задавать только тот минимум, который относится непосредственно к самой группе. В этом случае типовые переносы могут происходить ВООБЩЕ без необходимости какого-либо конфигурирования, ну а у кого «стандартное окружение» не вполне стандартно, тот сам виноват. 

  1. Тот же подход ограниченно применим и для работы с database\redis connection pools. Ограниченно — потому что, например, в случае с платформой ZIIoT непосредственный доступ к БД сервисов платформы — штука явно нештатная, которая бывает, когда речь идет о системах заказчика или лютом кастоме, необходимость переносов которого сомнительна. С другой стороны, таскать «нечто» между своими полигонами тоже надо… Дополнительное ограничение: DBCPConnectionPool – таки разделяемый ресурс. Если вы оставили “Max Total Connections”=8 по дефолту и завязали на этот пул 1050 групп процессоров – возможны варианты. Впрочем, при прочих равных дефолтов обычно вполне достаточно.

  2. Context maps – (HTTP, HTTPS), reader\writer сервисы – вопрос другой. Тот же http context map по дефолту предоставляет 5000 одновременных соединений, чего с одной стороны как бы и много, а с другой — может и не хватить на весь инстанс. В общем, в угоду переносимости я предпочитаю создавать эти сущности на уровне группы процессоров. Единственная рекомендация – переименовывать их в человекочитаемый вид, чтобы не было 100500 одинаковых HTTPContextMaps. С reader\writer сервисами примерно то же самое – плюсы от переиспользования не слишком очевидны, трудности от реализации определенно будут.

Теперь о паттернах.

Паттерны

Как я сказал выше, тут речь пойдет о самых популярных задачах, с которыми разработчики сталкиваются, имея дело с ZIIoT-платформой. Возможно, тем, кто еще не имеет дело с платформой, это тоже будет полезным, так как задачи эти не новы.

Задача 1. Чтение только новых записей и перенос их в платформу работы с данными

Это типовая задача, которую приходится решать, забирая данные из какой-нибудь системы заказчика. Часто это делается путем сохранения последней прочитанной записи где-нибудь в кэше и\или в специально созданной таблице БД, а также путем чтения данных за определенный период (now() минус N). В большинстве случаев так делать не стоит, так как есть аж три штатных средства – GenerateTableFetch, QueryDatabaseTable и QueryDatabaseTableRecord. См. Рис. 2

GenerateTableFetch создает SQL-запросы для последующего исполнения (ExecuteSQL(Record). По определению, чтение с дозагрузкой реализуется только на одной ноде кластера, чтобы не было накладок с синхронизацией последних прочитанных данных. В этом случае на нагруженных инстансах TableFetch позволяет распараллелить получение данных из БД: «легкую» часть (с генерацией запросов) выполняет одна нода, а «тяжелая» (с извлечением-обработкой) распараллеливается на все ноды кластера. Данный процессор поддерживает входные данные и может использовать атрибуты входящих flow-файлов для определения, например, таблицы, из которой надо произвести выборку. QueryDatabaseTable позволяет собственно извлечь данные из таблицы, выдавая данные в AVRO-формате. QueryDatabaseTableRecord дает на выходе набор записей, определенный RecordSetWriter.

Для таблицы такого вида (запись с id 527 добавлена позже), 

GenerateTableFetch (как более наглядный) генерирует два запроса:

SELECT * FROM test WHERE id <= 8 ORDER BY id LIMIT 10000

SELECT * FROM test WHERE id > 8 AND id <= 527 ORDER BY id LIMIT 10000

Рисунок 23
Рисунок 2. Чтение с дозагрузкой

Эти процессоры позволяют задать размер единичной выборки, определить список столбцов, которые должны в нее попасть, и даже задать начальное значение, если речь идет о первичной выборке из уже существующей базы не всех, а только актуальных для нас данных. 

Рисунок 3. Настройка выборки
Рисунок 3. Настройка выборки

Немаловажный вопрос – где хранится это «последнее прочитанное» значение и насколько это надежно? Ответ: в «STATE процессора». В случае кластера — в zookeeper’е кластера, в случае standalone-инсталляции – в хранилище состояний инстанса в формате WAL-лога или физически (по дефолту) – в папке /opt/nifi/nifi-current/state таким вот образом, как на Рис. 4.

Рисунок 4. Последние прочитанные значения в opt/nifi/nifi-current/state
Рисунок 4. Последние прочитанные значения в opt/nifi/nifi-current/state

Соответственно, если нам требуется сохранить это состояние в случае перезапуска всего NiFi, эту папку необходимо вынести в отдельный volume. Для очистки состояния можно воспользоваться контролом Clear state (View state на соответствующем процессоре) – см. Рис. 5.

Рисунок 26
Рисунок 5. Clear state  для очистки состояния

Задача 2. Организация шаблона UPdateorinSERT – UPSERT

Еще одна типовая задача – организация шаблона UPdateorinSERT – UPSERT. Создание записи в БД, в случае её наличия – обновление данных. Шаблон очень распространенный, но на уровне спецификации SQL не слишком стандартизованный. В postgresql он реализован с помощью INSERT INTO <…> ON CONFLICT DO <…>. В NiFi этот шаблон реализуется только в PutDatabaseRecord - Рис. 6.

Рисунок 33
Рисунок 6. PutDatabaseRecord - тестовая запись в БД + настройки самого процессора

Основная проблема здесь в том, что, например, ConvertJsonToSQL производит вывод типов исходя из параметров БД и корректно обрабатывает варианты {“id”:10} и {“id”:”10”}, а PutDatabaseRecord использует определение типов из avro-схемы recordsetreader (по дефолту – infer-schema, т.е. попытка «угадать» и создать схему исходя из самой записи). В этом случае, если в БД поле id с типом bigint, входной параметр {“id”:”10”} работать-таки не будет без переопределения avro-схемы.ля входного JSON’а:

[
  {
    "key": "7",
    "name": "jsontest1"
  },
  {
    "key": "8",
    "name": "jsontest2"
  },
  {
    "key": "9",
    "name": "jsontest3"
  },
  {
    "key": "1",
    "name": "jsontest4"
  }
]

И таблицы вида:

CREATE TABLE IF NOT EXISTS test (

id INTEGER NOT NULL DEFAULT nextval('test_id_seq' :: REGCLASS),

key BIGINT,

name CHARACTER VARYING,

CONSTRAINT test_key_key UNIQUE (key)

) WITH (OIDS = FALSE);

Схема будет выглядеть так, как на Рис. 7.

Рисунок 7. Настройка json record reader’а
Рисунок 7. Настройка json record reader’а
{
  "type": "record",
  "name": "test",
  "fields": [
    {
      "name": "key",
      "type": "long"
    },
    {
      "name": "name",
      "type": "string"
    }
  ]
}

Как правило, никакого rocket-science тут нет, но без подобного «восхода солнца вручную» хочется обойтись. В качестве альтернативы можно поиграться с convertjsontosql в INSERT, а в случае ошибки – повторный convertjsontosql уже в UPDATE, но тут схема выйдет совсем уж монструозной – Рис. 8.

Рисунок 35
Рисунок 8. Ручная правка данных под вставку\обновление

Конвертируем JSON в SQL INSERT, отправляем пачку транзакций на исполнение, в случае сбоя – идем по ветке fail, разбираем пачку транзакций поштучно (у второго PutSQL batch size=1). Те записи, что не вставились, конвертируем в UPDATE и отправляем обратно, чтобы избежать зацикливания, ограничиваем время жизни очереди. Тот еще костыльно-грабельный привод, очень бы хотелось как-нибудь без него.

Задача 3. Фильтрация записей

Традиционно делается через RouteOn(Attribute|Text): нужное направляем дальше, не подпавшее под условия — дропаем по fail. Альтернативой является более продвинутый QueryRecord, ориентированный на обработку именно набора записей. Он позволяет выполнять SQL(-like)-запросы, в том числе и по самому flow-файлу – Рис. 9.

Рисунок 27
Рисунок 9. Фильтрация с помощью QueryRecord

С помощью этого же инструмента можно организовать, например, сортировку входного потока записей (ORDER BY), задавать достаточно сложные условия и пр. 

Задача 4. Направление потоков данных в один putsql

Иногда, при выносе всех выходов на верхний уровень, мы видим, что, например, раньше запись в БД проводилась в нескольких местах на схеме. У нас появляется желание завернуть все потоки данных с convertjsontosql в один putsql. Хорошее желание, правильное, накладных расходов меньше, править проще. Но тут выясняется, что мы пишем (ну, вдруг?) в разные БД, а ConnectionPool ведет в одну конкретную. Проблема? Проблема. Но решаемая. Для этого, помимо собственно требуемых сервисов ConnectionPool, необходимо создать виртуальный DBCPConnectionPoolLookup, в котором динамическими свойствами задать псевдонимы для ConnectionPools – Рис. 10.

Рисунок 40
Рисунок 10. Настройка DBCPConnectionPoolLookup

После  этого ссылаемся на этот Lookup-service (выделено желтым) в *SQL-процессоре – Рис. 11.

Рисунок 41
Рисунок 11. Настройка ExecuteSQL

Так как таблицы для записи скорее всего будут разными, зададим их с помощью ExpressionLanguage. Все? Почти. Помимо имени таблицы необходимо в атрибуте database.name flow-файла указать еще и псевдоним ConnectionPool, с которым мы будем работать – Рис. 12.

Рисунок 44
Рисунок 12
Рисунок 12

Ну а готовые SQL-запросы отправляем на PutSQL с тем же DPCPConnectionPoolLookup и наслаждаемся результатом – Рис. 13.

Рисунок 45
Рисунок 13. Запись в две БД в одном потоке

Это позволит, например, определить единую точку записи в БД для всего инстанса NiFi, обвязать её нормально обработкой ошибок, логгингом, пробами, задать ей адреса-пароли-явки, завернуть не нее все потоки записи в БД и не беспокоиться, что кто-то кое-где у нас порой на-ко-ся-чит.

Тот же подход применим и к кэшу. Не важно, какой бэкенд – мы всегда стучимся в lookup service. 

Задача 5. Вынос данных в кэш

Вынос данных в кэш – достаточно типовой паттерн использования относительно редко меняющихся и относительно часто использующихся в нескольких местах данных. Думаю, все знакомы с ним по получению токена, но использовать его можно много где. Например, в случае интенсивной работы со справочником имеет смысл не делать по запросу на каждый проходящий flow-файл, а по расписанию забирать справочник и дергать нужные значения из кэша – см. Рис. 14.

Рисунок 14. Кэширование справочников
Рисунок 14. Кэширование справочников

На входе получаем список справочников и имена интересующих полей, на выходе – записи ключ-значение (имя поля – значение) в кэше. Обратите внимание, что в целом этот паттерн ухудшает читаемость схемы — визуальный «разрыв» в потоке управления, особенно если создание кэша и его использование происходят на разных уровнях вложенности, затрудняет понимание происходящего. Но этот шаблон слишком полезен, чтобы от него отказываться. 

Wait-notify создает те же проблемы в читаемости схемы (Визуальный разрыв, когда “Notify” логически никак не связан с “Wait”), но может решать больше проблем, чем создает. Помимо очевидного «отлова» триггерных событий, он достаточно широко используется для реализации паттерна split-merge. Последний (со стратегией объединения Defragment) хорошо работает в случае, если количество объектов не меняется в процессе выполнения. В противном случае (вложенный split, фильтрация результатов, обработка в цикле с условием) –  «Хьюстон, у нас проблемы!». Не то, чтобы неразрешимые, нет – я уже видел решение с ручной правкой атрибута fragment count, но все же – проблемы. Wait-notify их решает так, как на  Рис. 15.

Рисунок 15. Split-merge с ожиданием
Рисунок 15. Split-merge с ожиданием

В этом случае merge производится не по количеству данных, полученных в результате сплита, а по сигналу выхода из цикла. До получения сигнала все данные ожидают в очереди wait процессора Wait, и только после получения сигнала попадают в процессор EnforceOrder, а затем — в merge с типом bin-packing algorithm – см. Рис. 16.

Рисунок 39
Рисунок 16. Настройки MergeContent

Обратите внимание, что wait не гарантирует сохранение порядка обработки, скорее наоборот! В некоторых кейсах это важно.

Сам по себе wait-notify обменивается сигналами через общий распределенный кэш и позволяет достаточно гибко управлять потоком: учитывать количество поступивших сигналов, выпускать заданное количество flow-файлов по сигналу и прочее. Но нужно это не везде и не всегда.

На этом у меня все, пока. Спасибо, что дочитали аж до сюда. Надеюсь, мой писательский труд был немного (или даже много) полезен. Если в комментариях всплывут какие-то еще интересные темы, стоящие внимания, то еще увидимся.

Комментарии (0)