Переносимость процессоров и паттерны
Вот и обещанная третья часть саги о том, что в NiFi можно делать и как это делать правильно, без претензий на истину в последней инстанции, конечно. Сегодня расскажу о переносимости процессоров и дам несколько паттернов для самых популярных задач на платформе ZIIoT. Если вдруг вам интересно почитать про оптимизацию схем и производительности в NiFi — велком в первую часть. Если мечтаете узнать больше о мониторинге, то вторая часть — must read. Только потом сюда не забудьте вернуться.
Переносимость процессоров
Хорошая группа процессоров – переносимая группа процессоров. Бывают, конечно, проектные кастомы, никому кроме разработчика (да и ему после сдачи проекта) не интересные, но даже их бывает необходимо переносить между dev-test-prod средами, и хорошо бы с минимальными накладными расходами. Вот несколько очевидных и не очень лайфхаков, как эту переносимость обеспечить.
Первое, что нам поможет, – версионирование групп процессоров с помощью NiFi registry. Собственно, об этом инструменте уже достаточно написано. Используйте его – рекомендую.
Следующий важный момент – это окружение скрипта. Прежде всего, все конфигурируемые параметры имеет смысл выносить в переменные. Это нужно хотя бы для того, чтобы в случае необходимости изменения (например, при переносе между полигонами) вносить все правки в одном месте с автоматическим перезапуском задействованных элементов. Дополнительно стоит явным образом обозначить перечень конфигурируемых таким образом сущностей на Label, например, как на Рис. 1.
И вот тут есть два принципиально различных подхода к обеспечению переносимости.
Первый подход — все необходимые параметры задаются на уровне группы процессоров. Переносимость получается 100% – бери и неси, только не забудь поправить по месту! Но управляемость страдает. Представьте десяток подобным образом организованных групп процессоров, и вдруг у вас возникает необходимость изменить один параметр во всех. А для пущего счастья – каждый разработчик может обозвать переменную так, как ему в тот момент захотелось. Ужас, да?
Второй подход (за который я и выступаю) – максимально стандартизировать окружение NiFi на всех полигонах: определить стандартный набор переменных, задавать их с помощью ENV\Variable registry, поставлять в стандартном наборе стандартный же механизм получения токена для доступа к API и пр., а в группе процессоров задавать только тот минимум, который относится непосредственно к самой группе. В этом случае типовые переносы могут происходить ВООБЩЕ без необходимости какого-либо конфигурирования, ну а у кого «стандартное окружение» не вполне стандартно, тот сам виноват.
Тот же подход ограниченно применим и для работы с database\redis connection pools. Ограниченно — потому что, например, в случае с платформой ZIIoT непосредственный доступ к БД сервисов платформы — штука явно нештатная, которая бывает, когда речь идет о системах заказчика или лютом кастоме, необходимость переносов которого сомнительна. С другой стороны, таскать «нечто» между своими полигонами тоже надо… Дополнительное ограничение: DBCPConnectionPool – таки разделяемый ресурс. Если вы оставили “Max Total Connections”=8 по дефолту и завязали на этот пул 1050 групп процессоров – возможны варианты. Впрочем, при прочих равных дефолтов обычно вполне достаточно.
Context maps – (HTTP, HTTPS), reader\writer сервисы – вопрос другой. Тот же http context map по дефолту предоставляет 5000 одновременных соединений, чего с одной стороны как бы и много, а с другой — может и не хватить на весь инстанс. В общем, в угоду переносимости я предпочитаю создавать эти сущности на уровне группы процессоров. Единственная рекомендация – переименовывать их в человекочитаемый вид, чтобы не было 100500 одинаковых HTTPContextMaps. С reader\writer сервисами примерно то же самое – плюсы от переиспользования не слишком очевидны, трудности от реализации определенно будут.
Теперь о паттернах.
Паттерны
Как я сказал выше, тут речь пойдет о самых популярных задачах, с которыми разработчики сталкиваются, имея дело с ZIIoT-платформой. Возможно, тем, кто еще не имеет дело с платформой, это тоже будет полезным, так как задачи эти не новы.
Задача 1. Чтение только новых записей и перенос их в платформу работы с данными
Это типовая задача, которую приходится решать, забирая данные из какой-нибудь системы заказчика. Часто это делается путем сохранения последней прочитанной записи где-нибудь в кэше и\или в специально созданной таблице БД, а также путем чтения данных за определенный период (now() минус N). В большинстве случаев так делать не стоит, так как есть аж три штатных средства – GenerateTableFetch, QueryDatabaseTable и QueryDatabaseTableRecord. См. Рис. 2
GenerateTableFetch создает SQL-запросы для последующего исполнения (ExecuteSQL(Record). По определению, чтение с дозагрузкой реализуется только на одной ноде кластера, чтобы не было накладок с синхронизацией последних прочитанных данных. В этом случае на нагруженных инстансах TableFetch позволяет распараллелить получение данных из БД: «легкую» часть (с генерацией запросов) выполняет одна нода, а «тяжелая» (с извлечением-обработкой) распараллеливается на все ноды кластера. Данный процессор поддерживает входные данные и может использовать атрибуты входящих flow-файлов для определения, например, таблицы, из которой надо произвести выборку. QueryDatabaseTable позволяет собственно извлечь данные из таблицы, выдавая данные в AVRO-формате. QueryDatabaseTableRecord дает на выходе набор записей, определенный RecordSetWriter.
Для таблицы такого вида (запись с id 527 добавлена позже),
GenerateTableFetch (как более наглядный) генерирует два запроса:
SELECT * FROM test WHERE id <= 8 ORDER BY id LIMIT 10000
SELECT * FROM test WHERE id > 8 AND id <= 527 ORDER BY id LIMIT 10000
Эти процессоры позволяют задать размер единичной выборки, определить список столбцов, которые должны в нее попасть, и даже задать начальное значение, если речь идет о первичной выборке из уже существующей базы не всех, а только актуальных для нас данных.
Немаловажный вопрос – где хранится это «последнее прочитанное» значение и насколько это надежно? Ответ: в «STATE процессора». В случае кластера — в zookeeper’е кластера, в случае standalone-инсталляции – в хранилище состояний инстанса в формате WAL-лога или физически (по дефолту) – в папке /opt/nifi/nifi-current/state таким вот образом, как на Рис. 4.
Соответственно, если нам требуется сохранить это состояние в случае перезапуска всего NiFi, эту папку необходимо вынести в отдельный volume. Для очистки состояния можно воспользоваться контролом Clear state (View state на соответствующем процессоре) – см. Рис. 5.
Задача 2. Организация шаблона UPdateorinSERT – UPSERT
Еще одна типовая задача – организация шаблона UPdateorinSERT – UPSERT. Создание записи в БД, в случае её наличия – обновление данных. Шаблон очень распространенный, но на уровне спецификации SQL не слишком стандартизованный. В postgresql он реализован с помощью INSERT INTO <…> ON CONFLICT DO <…>. В NiFi этот шаблон реализуется только в PutDatabaseRecord - Рис. 6.
Основная проблема здесь в том, что, например, ConvertJsonToSQL производит вывод типов исходя из параметров БД и корректно обрабатывает варианты {“id”:10} и {“id”:”10”}, а PutDatabaseRecord использует определение типов из avro-схемы recordsetreader (по дефолту – infer-schema, т.е. попытка «угадать» и создать схему исходя из самой записи). В этом случае, если в БД поле id с типом bigint, входной параметр {“id”:”10”} работать-таки не будет без переопределения avro-схемы.ля входного JSON’а:
[
{
"key": "7",
"name": "jsontest1"
},
{
"key": "8",
"name": "jsontest2"
},
{
"key": "9",
"name": "jsontest3"
},
{
"key": "1",
"name": "jsontest4"
}
]
И таблицы вида:
CREATE TABLE IF NOT EXISTS test (
id INTEGER NOT NULL DEFAULT nextval('test_id_seq' :: REGCLASS),
key BIGINT,
name CHARACTER VARYING,
CONSTRAINT test_key_key UNIQUE (key)
) WITH (OIDS = FALSE);
Схема будет выглядеть так, как на Рис. 7.
{
"type": "record",
"name": "test",
"fields": [
{
"name": "key",
"type": "long"
},
{
"name": "name",
"type": "string"
}
]
}
Как правило, никакого rocket-science тут нет, но без подобного «восхода солнца вручную» хочется обойтись. В качестве альтернативы можно поиграться с convertjsontosql в INSERT, а в случае ошибки – повторный convertjsontosql уже в UPDATE, но тут схема выйдет совсем уж монструозной – Рис. 8.
Конвертируем JSON в SQL INSERT, отправляем пачку транзакций на исполнение, в случае сбоя – идем по ветке fail, разбираем пачку транзакций поштучно (у второго PutSQL batch size=1). Те записи, что не вставились, конвертируем в UPDATE и отправляем обратно, чтобы избежать зацикливания, ограничиваем время жизни очереди. Тот еще костыльно-грабельный привод, очень бы хотелось как-нибудь без него.
Задача 3. Фильтрация записей
Традиционно делается через RouteOn(Attribute|Text): нужное направляем дальше, не подпавшее под условия — дропаем по fail. Альтернативой является более продвинутый QueryRecord, ориентированный на обработку именно набора записей. Он позволяет выполнять SQL(-like)-запросы, в том числе и по самому flow-файлу – Рис. 9.
С помощью этого же инструмента можно организовать, например, сортировку входного потока записей (ORDER BY), задавать достаточно сложные условия и пр.
Задача 4. Направление потоков данных в один putsql
Иногда, при выносе всех выходов на верхний уровень, мы видим, что, например, раньше запись в БД проводилась в нескольких местах на схеме. У нас появляется желание завернуть все потоки данных с convertjsontosql в один putsql. Хорошее желание, правильное, накладных расходов меньше, править проще. Но тут выясняется, что мы пишем (ну, вдруг?) в разные БД, а ConnectionPool ведет в одну конкретную. Проблема? Проблема. Но решаемая. Для этого, помимо собственно требуемых сервисов ConnectionPool, необходимо создать виртуальный DBCPConnectionPoolLookup, в котором динамическими свойствами задать псевдонимы для ConnectionPools – Рис. 10.
После этого ссылаемся на этот Lookup-service (выделено желтым) в *SQL-процессоре – Рис. 11.
Так как таблицы для записи скорее всего будут разными, зададим их с помощью ExpressionLanguage. Все? Почти. Помимо имени таблицы необходимо в атрибуте database.name flow-файла указать еще и псевдоним ConnectionPool, с которым мы будем работать – Рис. 12.
Ну а готовые SQL-запросы отправляем на PutSQL с тем же DPCPConnectionPoolLookup и наслаждаемся результатом – Рис. 13.
Это позволит, например, определить единую точку записи в БД для всего инстанса NiFi, обвязать её нормально обработкой ошибок, логгингом, пробами, задать ей адреса-пароли-явки, завернуть не нее все потоки записи в БД и не беспокоиться, что кто-то кое-где у нас порой на-ко-ся-чит.
Тот же подход применим и к кэшу. Не важно, какой бэкенд – мы всегда стучимся в lookup service.
Задача 5. Вынос данных в кэш
Вынос данных в кэш – достаточно типовой паттерн использования относительно редко меняющихся и относительно часто использующихся в нескольких местах данных. Думаю, все знакомы с ним по получению токена, но использовать его можно много где. Например, в случае интенсивной работы со справочником имеет смысл не делать по запросу на каждый проходящий flow-файл, а по расписанию забирать справочник и дергать нужные значения из кэша – см. Рис. 14.
На входе получаем список справочников и имена интересующих полей, на выходе – записи ключ-значение (имя поля – значение) в кэше. Обратите внимание, что в целом этот паттерн ухудшает читаемость схемы — визуальный «разрыв» в потоке управления, особенно если создание кэша и его использование происходят на разных уровнях вложенности, затрудняет понимание происходящего. Но этот шаблон слишком полезен, чтобы от него отказываться.
Wait-notify создает те же проблемы в читаемости схемы (Визуальный разрыв, когда “Notify” логически никак не связан с “Wait”), но может решать больше проблем, чем создает. Помимо очевидного «отлова» триггерных событий, он достаточно широко используется для реализации паттерна split-merge. Последний (со стратегией объединения Defragment) хорошо работает в случае, если количество объектов не меняется в процессе выполнения. В противном случае (вложенный split, фильтрация результатов, обработка в цикле с условием) – «Хьюстон, у нас проблемы!». Не то, чтобы неразрешимые, нет – я уже видел решение с ручной правкой атрибута fragment count, но все же – проблемы. Wait-notify их решает так, как на Рис. 15.
В этом случае merge производится не по количеству данных, полученных в результате сплита, а по сигналу выхода из цикла. До получения сигнала все данные ожидают в очереди wait процессора Wait, и только после получения сигнала попадают в процессор EnforceOrder, а затем — в merge с типом bin-packing algorithm – см. Рис. 16.
Обратите внимание, что wait не гарантирует сохранение порядка обработки, скорее наоборот! В некоторых кейсах это важно.
Сам по себе wait-notify обменивается сигналами через общий распределенный кэш и позволяет достаточно гибко управлять потоком: учитывать количество поступивших сигналов, выпускать заданное количество flow-файлов по сигналу и прочее. Но нужно это не везде и не всегда.
На этом у меня все, пока. Спасибо, что дочитали аж до сюда. Надеюсь, мой писательский труд был немного (или даже много) полезен. Если в комментариях всплывут какие-то еще интересные темы, стоящие внимания, то еще увидимся.