Это вторая часть из цикла статей про прототипирование IIoT-решения на Raspberry PI и Yandex IoT.
В первой части мы реализовали основные функции на Raspberry PI:
сбор телеметрии с промышленных датчиков по протоколу Modbus;
их передачу в облако;
локальный мониторинг процесса в реальном времени.
Однако пока наш проект выглядит достаточно странно - данные попадают в облако, но никак там не “оседают”, а проходят насквозь, или вообще исчезают бесследно, если никто не успел их прочитать.
Настало время это исправить - разберемся с тем, как можно накапливать и обрабатывать переданную телеметрию в Яндекс Облаке.
Требуемый функционал
В рамках нашего решения облако должно решать следующие основные задачи:
принимать телеметрию устройства;
накапливать телеметрию в хранилище;
предоставлять доступ к накопленным данным с целью их последующего использования при разработки моделей, формирования отчетов и пр.
Посмотрим, какие инструменты для решения этих задач есть в Яндекс Облаке.
Доступные инструменты
Сбор и обработка телеметрии
Yandex IoT Core
В первой части мы уже познакомились с Yandex IoT Core и достаточно подробно разобрали его функционал. Если кратко, это MQTT-брокер от Яндекса, с помощью которого осуществляется обмен сообщениями между реестрами и устройствами.
Его мы и используем, чтобы отправлять телеметрию устройства в облако (как именно? велком в первую часть)
Yandex Cloud Functions
По умолчанию, с полученными по MQTT сообщениями в облаке ничего не происходит и информацию о них можно увидеть только в логах реестра или устройства. Для того, чтобы обработать присланную телеметрию нужно использовать сервис Yandex Cloud Functions.
Этот сервис позволяет разворачивать serverless-функции (на данный момент на Python, Node.js, Bash, Go и PHP), а также обладает механизмом “триггеров” - запуска развернутых функций по какому-либо условию (таймеру, событию).
Например, можно создать функцию-обработчик сообщения телеметрии и триггер, привязанный к топику устройства или реестра IoT Core, который будет отправлять пришедшие в топик сообщения на обработку этой функции. Таким образом, способы обработки сообщений ограничивается только возможностями выбранного языка программирования, квотами и лимитами сервиса.
Yandex Message Queue
Yandex Message Queue - сервис для обмена сообщениями между приложениями. В рамках этого сервиса можно реализовать механизм очередей, в которые одни приложения отправляют сообщения, а другие - забирают.
Этот механизм будет полезен, если в качестве хранилища использовать ClickHouse или схожие аналитические колоночные СУБД, плохо переносящие единичные вставки - в очереди можно накопить пачку сообщений и отправить их на вставку одним большим пакетом.
Хранение телеметрии
Для хранения данных Яндекс Облако предоставляет большое количество Managed Services для разных СУБД.
Managed Service любой СУБД, позволяет быстро развернуть готовый к работе кластер с необходимым хранилищем. Также в рамках облака можно быстро масштабировать ресурсы кластера в случае увеличения нагрузки.
Доступные на текущий момент СУБД:
PostgreSQL
ClickHouse
MongoDB
MySQL
Redis
SQL Server
По умолчанию, доступ к кластеру возможен только сервисами, развернутыми в рамках той же облачной сети (Yandex Virtual Private Cloud). Но при создании кластера можно включить публичный доступ, и тогда ресурс будет доступен из любой точки интернета.
Использование накопленной телеметрии
Yandex DataLens
Это сервис визуализации данных и анализа данных. Позволяет достаточно быстро создавать дашборды на основе данных из разнообразных источников.
Можно подключаться как к хранилищам, развернутым в облаке, так и сторонним источникам.
Yandex DataSphere
Сервис для ML-разработки, предоставляющий все необходимые инструменты и динамически масштабируемые ресурсы для полного цикла разработки машинного обучения.
Работа ведется в ноутбуках JupyterLab, есть возможность выбора ресурсов, на которых будет выполняться каждая отдельная ячейка (количество ядер, GPU).
Дополнительные инструменты
Yandex Monitoring
Сервис позволяет собирать, хранить и отображать метрики облачных ресурсов, а также настраивать алерты и присылать по ним уведомления. В отличии от DataLens, умеет обновлять графики онлайн и подходит для мониторинга в реальном времени!
Настройка сохранения телеметрии
Несмотря на разнообразие инструментов, сейчас мы ограничимся тем, что будем просто сохранять телеметрию, пришедшую в IoT Core, в базу данных.
Для этого нам нужно развернуть и настроить 4 элемента:
IoT Core: создать реестр, устройство, наладить передачу сообщений по MQTT (сделано в первой части);
Managed Service for PostgreSQL - развернуть кластер, создать таблицу для хранения телеметрии;
Cloud Functions - написать функцию-обработчик сообщения с телеметрией: функция должна записывать payload сообщения в БД;
Cloud Functions - настроить триггер IoT Core, который будет запускать функцию-обработчик при появлении нового сообщения в топике реестра.
Частично здесь мы будем опираться на пример подобного решения из документации Яндекс Облака.
Настройка PostgreSQL
Для начала подключаем и настраиваем кластер Postgres:
Нам подойдет самая минимальная конфигурация - b2.nano (если впоследствии проект перерастет во что-то большее, ее легко можно будет расширить):
Заводим пользователя и базу данных:
В разделе хосты нужно будет разрешить публичный доступ к ресурсу:
Это нужно для того, чтобы база была доступна для обращения из Cloud Functions.
Создадим кластер. Теперь придется подождать некоторое время пока кластер развернется и его статус поменяется с Creating на Alive.
После того, как кластер развернулся, заходим в него и создаем таблицу:
Пока будем просто складывать весь payload в одну колонку. В последствии его можно будет распарсить на отдельные колонки по каждому значению, или классический timeseries:
CREATE TABLE telemetry_hist
( telemetry_timestamp timestamp,
device_nm varchar(200),
payload varchar(2000)
);
Такой подход удобен ещё и тем, что если в процессе доработки проекта поменяется структура payload-а, сохранение в БД не сломается и телеметрия не будет теряться.
Создание функции-обработчика
Функция будет получать сообщения из MQTT-брокера и записывать данные в таблицу, созданную ранее.
В каталоге в консоли выбираем Cloud Functions:
Создаем функцию с названием iot-core-handler и каким-нибудь говорящим описанием.
В открывшемся редакторе выбираем среду выполнения. Мы будем использовать Python 3.7 (preview).
Теперь нам доступен редактор кода. Здесь мы можем написать требуемую функцию, создать ее версию, после чего она сразу развернется и станет доступной для выполнения.
Помимо редактора кода, можно разворачивать код из архивов в Object Storage или загруженных напрямую.
Мы будем использовать код, предоставленный в документации Яндекс Облака (гитхаб), немного поправив его под наши нужды.
Правки касаются формата таблицы и payload сообщений:
Функция makeInsertStatement:
Поменяем формат инсерта согласно формату нашей таблицы, уберем разбор пэйлоада на отдельные поля, а также изменим входную переменную event_id на event_ts.
def makeInsertStatement(event_ts, payload_json, table_name):
msg = json.loads(payload_json)
msg_str = json.dumps(msg)
logger.info(msg_str)
insert= f"""INSERT INTO {table_name} (telemetry_timestamp , device_nm , payload) VALUES ('{event_ts}','iot_test_device', '{msg_str}')"""
return insert
Функция makeCreateTableStatement:
Изменим выражение в соответсвтии с форматом таблицы.
def makeCreateTableStatement(table_name):
statement = f"""CREATE TABLE {table_name} (
( telemetry_timestamp timestamp,
device_nm varchar(200),
payload varchar(2000)
);
"""
return statement
Функция msgHandler:
Изменим переменную event_id на event_ts (96 строка) и будем формировать ее следующим образом:
event_ts = json_msg["event_metadata"]["created_at"]
Изменим значение переменной table_name на название нашей таблицы (98 строка):
table_name = ‘telemetry_hist’
В функцию makeInsertStatement в качестве первого аргумента отправляем не event_id, а event_ts (99 строка):
sql = makeInsertStatement(event_ts, event_payload, table_name)
Код с уже внесенными правками можно найти в этом гисте. Вставим его в файл index.py.
Обратите внимание, если вы использовали другое название таблицы для сохранения телеметрии, необходимо указать его в качестве значения переменной table_name на 79 строке.
Теперь, перед созданием версии функции, в форме под редактором необходимо заполнить параметры развертывания:
Точка входа: укажем index.msgHandler - это имя функции в файле index.py, которая будет вызываться в качестве обработчика. Указывается в формате <имя файла с функцией>.<имя обработчика>
Таймаут, с: 10 секунд -максимальное время выполнения функции. Облачная функция может выполняться не более 10 минут.
Память: 128 МБ - объем необходимой для функции памяти
Сервисный аккаунт - укажем (или создадим, если его еще нет) сервисный аккаунт с ролями serverless.functions.invoker и editor
Переменные окружения - обратите внимание, разработанная функция использует 6 переменных окружения, которые необходимо заполнить в соответствующих полях. Нужно указать следующее:
VERBOSE_LOG — параметр, отвечающий за вывод подробной информации о выполнении функции. Введем значение True.
DB_HOSTNAME — имя хоста БД PostgreSQL для подключения.
DB_PORT — порт для подключения.
DB_NAME — имя базы данных для подключения.
DB_USER — имя пользователя для подключения.
DB_PASSWORD — пароль, который был введен при создании кластера
Все данные для подключения (кроме пароля) можно найти в обзоре развернутого Managed Service for PostgreSQL.
В итоге мы получаем такое заполнение полей:
Создадим версию функции (кнопка Создать версию).
Настройка триггера IoT Core
Вернемся в раздел Cloud Functions и выберем подраздел “Триггеры”.
Создадим триггер (кнопка “Создать триггер”).
В блоке Базовые параметры:
В поле Имя введем имя триггера.
В поле Тип выберем Yandex IoT Core. Так мы укажем сервису, что будем работать с обработкой событий IoT Core. Кроме этого источника, для решения других задач можно обрабатывать сообщения Message Queue, события Object Storage, Container Registry, логов Cloud Logs, а также запускать функцию по таймеру.
В блоке Настройки сообщений Yandex IoT Core:
В поле Реестр введем iot_test-reg - реестр к которому привязано наше устройствою
В поле Устройство выберем Любое устройство (т.к. мы отправляем сообщения в топик реестра)
В поле Топик укажем топик, в который устройство отправляет данные:
$registries/<ID реестра>/events
Теперь триггер будет срабатывать при появлении новых данных в указанном топике.
В блоке Настройки функции:
Выберем функцию для обработки данных, созданную ранее (iot-core-handler).
В поле Тег версии укажем $latest - тогда триггер будет запускать последнюю развернутую .
В поле Сервисный аккаунт укажем созданный ранее сервисный аккаунт.
Остальные поля оставим пустыми.
Создадим триггер с заданными настройками.
Проверка работоспособности
Всё готово!
Теперь (если всё сделано правильно) каждое сообщение, отправляемое устройством, будет через созданный триггер инициировать вызов функции. Которая, в свою очередь, положит отправленный payload в таблицу на Postgres.
Проверим логи выполнения функции (Cloud Functions -> Функции -> iot-core-handler -> Логи).
Здесь отображаются сообщения, выводимые функцией в процессе работы, в том числе, сообщения об ошибках. Если сообщений об ошибках нет (а есть информационные сообщения, начинающиеся с [INFO]) - функция работает корректно.
Посмотрим теперь заполнение таблицы telemetry_hist в нашей БД (Managed Services for PostgresSQL -> telemetry_store -> SQL). Вводим имя пользователя и пароль и попадаем в редактор SQL.
Вводим простой запрос для получения выгрузки из таблицы
SELECT * FROM telemetry_hist
И видим всю историю отправленных пэйлоадов:
То есть всё работает: теперь телеметрия попадает в облако, накапливается в развернутой БД и доступна для дальнейшего анализа в любое удобное время из любой точки планеты!
PS. Если по каким-то причинам таблица остается пустой, можно проверить следующие моменты:
Отправило ли устройство хотя бы одно сообщение с момента деплоя функции и триггера?
Если нет - просто ждем пока это произойдет)
Если отправляло - смотрим логи выполнения функции.
Если логи пустые - стоит проверить триггер (в частности правильно ли указан топик и функция).
Если в логах ошибки - разбираемся с кодом.
Если функция отрабатывает и ошибок нет - проверяем настройки подключения к БД и имя таблицы.
oleg_gavrilov
Я правильно понял, что вы учите людей вызывать cloud function на каждое событие в MQTT, и вставлять в pg записи по одной? Может, хоть какое-то примечание сделаете, что это «применимо только в образовательных целях»? Извиняюсь, если пропустил что-то.
vldplcd Автор
Да, все верно, статья скорее образовательная — обзор того, как быстро набросать простое решение в облаке, чтобы телеметрия не пропадала, а куда-нибудь сохранялась, т.е. какие инструменты для этого можно использовать (все-таки не раз подчеркивали, что это цикл статей о «быстром прототипе»)).
Естественно, в промышленных решениях необходимо решать задачи обработки более плотного потока данных (по сравнению с тем, что был в разобранном примере). И если сейчас единичные вставки прокатывают (все-таки при слабой интенсивности pg их адекватно переваривает), то с увеличением нагрузки и использованием, скажем, Clickhouse, нужно что-то придумывать. Самое простое — с помощью того же сервиса очередей формировать большие батчи, которые затем направлять на вставку СУБД. Кажется, это хорошая тема для отдельной статьи)