Это вторая часть из цикла статей про прототипирование IIoT-решения на Raspberry PI и Yandex IoT.

В первой части мы реализовали основные функции на Raspberry PI:

  • сбор телеметрии с промышленных датчиков по протоколу Modbus; 

  • их передачу в облако;

  • локальный мониторинг процесса в реальном времени.

Однако пока наш проект выглядит достаточно странно - данные попадают в облако, но никак там не “оседают”, а проходят насквозь, или вообще исчезают бесследно, если никто не успел их прочитать.

Настало время это исправить -  разберемся с тем, как можно накапливать и обрабатывать переданную телеметрию в Яндекс Облаке.


Требуемый функционал

В рамках нашего решения облако должно решать следующие основные задачи:

  • принимать телеметрию устройства;

  • накапливать телеметрию в хранилище;

  • предоставлять доступ к накопленным данным с целью их последующего использования при разработки моделей, формирования отчетов и пр.

Посмотрим, какие инструменты для решения этих задач есть в Яндекс Облаке.

Доступные инструменты

Сбор и обработка телеметрии

Yandex IoT Core

В первой части мы уже познакомились с Yandex IoT Core и достаточно подробно разобрали его функционал. Если кратко, это MQTT-брокер от Яндекса, с помощью которого осуществляется обмен сообщениями между реестрами и устройствами.

Его мы и используем, чтобы отправлять телеметрию устройства в облако (как именно? велком в первую часть)

Yandex Cloud Functions

По умолчанию, с полученными по MQTT сообщениями в облаке ничего не происходит и информацию о них можно увидеть только в логах реестра или устройства. Для того, чтобы обработать присланную телеметрию нужно использовать сервис Yandex Cloud Functions

Этот сервис позволяет разворачивать serverless-функции (на данный момент на Python, Node.js, Bash, Go и PHP), а также обладает механизмом “триггеров” - запуска развернутых функций по какому-либо условию (таймеру, событию). 

Например, можно создать функцию-обработчик сообщения телеметрии и триггер, привязанный к топику устройства или реестра IoT Core, который будет отправлять пришедшие в топик сообщения на обработку этой функции. Таким образом, способы обработки сообщений ограничивается только возможностями выбранного языка программирования, квотами и лимитами сервиса.

Yandex Message Queue

Yandex Message Queue - сервис для обмена сообщениями между приложениями. В рамках этого сервиса можно реализовать механизм очередей, в которые одни приложения отправляют сообщения, а другие - забирают.

Этот механизм будет полезен, если в качестве хранилища использовать ClickHouse или схожие аналитические колоночные СУБД, плохо переносящие единичные вставки - в очереди можно накопить пачку сообщений и отправить их на вставку одним большим пакетом.

Хранение телеметрии

Для хранения данных Яндекс Облако предоставляет большое количество Managed Services для разных СУБД.

Managed Service любой СУБД, позволяет быстро развернуть готовый к работе кластер с необходимым хранилищем. Также в рамках облака можно быстро масштабировать ресурсы кластера в случае увеличения нагрузки.

Доступные на текущий момент СУБД:

  • PostgreSQL

  • ClickHouse

  • MongoDB

  • MySQL

  • Redis

  • SQL Server

По умолчанию, доступ к кластеру возможен только сервисами, развернутыми в рамках той же облачной сети (Yandex Virtual Private Cloud). Но при создании кластера можно включить публичный доступ, и тогда ресурс будет доступен из любой точки интернета.

Использование накопленной телеметрии

Yandex DataLens

Это сервис визуализации данных и анализа данных. Позволяет достаточно быстро создавать дашборды на основе данных из разнообразных источников.

Можно подключаться как к хранилищам, развернутым в облаке, так и сторонним источникам.

Yandex DataSphere

Сервис для ML-разработки, предоставляющий все необходимые инструменты и динамически масштабируемые ресурсы для полного цикла разработки машинного обучения. 

Работа ведется в ноутбуках JupyterLab, есть возможность выбора ресурсов, на которых будет выполняться каждая отдельная ячейка (количество ядер, GPU).

Дополнительные инструменты

Yandex Monitoring

Сервис позволяет собирать, хранить и отображать метрики облачных ресурсов, а также настраивать алерты и присылать по ним уведомления. В отличии от DataLens, умеет обновлять графики онлайн и подходит для мониторинга в реальном времени!

Настройка сохранения телеметрии

Несмотря на разнообразие инструментов, сейчас мы ограничимся тем, что будем просто сохранять телеметрию, пришедшую в IoT Core, в базу данных.

Для этого нам нужно развернуть и настроить 4 элемента:

  • IoT Core: создать реестр, устройство, наладить передачу сообщений по MQTT (сделано в первой части);

  • Managed Service for PostgreSQL - развернуть кластер, создать таблицу для хранения телеметрии;

  • Cloud Functions - написать функцию-обработчик сообщения с телеметрией: функция должна записывать payload сообщения в БД;

  • Cloud Functions - настроить триггер IoT Core, который будет запускать функцию-обработчик при появлении нового сообщения в топике реестра.

Частично здесь мы будем опираться на пример подобного решения из документации Яндекс Облака.

Настройка PostgreSQL

Для начала подключаем и настраиваем кластер Postgres:

Сервис Managed Service for PostgreSQL в консоли.
Сервис Managed Service for PostgreSQL в консоли.

Нам подойдет самая минимальная конфигурация - b2.nano (если впоследствии проект перерастет во что-то большее, ее легко можно будет расширить):

Конфигурация кластера.
Конфигурация кластера.

Заводим пользователя и базу данных:

Настройки БД.
Настройки БД.

В разделе хосты нужно будет разрешить публичный доступ к ресурсу:

Настройка публичного доступа к БД.
Настройка публичного доступа к БД.

Это нужно для того, чтобы база была доступна для обращения из Cloud Functions.

Создадим кластер. Теперь придется подождать некоторое время пока кластер развернется и его статус поменяется с Creating на Alive.

Создание кластера.
Создание кластера.

После того, как кластер развернулся, заходим в него и создаем таблицу:

Вход в SQL клиент облака
Вход в SQL клиент облака

Пока будем просто складывать весь payload в одну колонку. В последствии его можно будет распарсить на отдельные колонки по каждому значению, или классический timeseries:

CREATE TABLE telemetry_hist
( telemetry_timestamp timestamp,
  device_nm varchar(200),
  payload varchar(2000)
);

Такой подход удобен ещё и тем, что если в процессе доработки проекта поменяется структура payload-а, сохранение в БД не сломается и телеметрия не будет теряться.

Создание функции-обработчика

Функция будет получать сообщения из MQTT-брокера и записывать данные в таблицу, созданную ранее.

В каталоге в консоли выбираем Cloud Functions:

Сервис Cloud Functions в консоли.
Сервис Cloud Functions в консоли.

Создаем функцию с названием iot-core-handler и каким-нибудь говорящим описанием.

В открывшемся редакторе выбираем среду выполнения. Мы будем использовать Python 3.7 (preview).

Выбор среды выполнения при создании YCF.
Выбор среды выполнения при создании YCF.

Теперь нам доступен редактор кода. Здесь мы можем написать требуемую функцию, создать ее версию, после чего она сразу развернется и станет доступной для выполнения.

Редактор кода YCF.
Редактор кода YCF.

Помимо редактора кода, можно разворачивать код из архивов в Object Storage или загруженных напрямую.

Мы будем использовать код, предоставленный в документации Яндекс Облака (гитхаб), немного поправив его под наши нужды.

Правки касаются формата таблицы и payload сообщений:

  • Функция makeInsertStatement:

    Поменяем формат инсерта согласно формату нашей таблицы, уберем разбор пэйлоада на отдельные поля, а также изменим входную переменную event_id на event_ts.

def makeInsertStatement(event_ts, payload_json, table_name):

    msg = json.loads(payload_json)
    msg_str = json.dumps(msg)
    logger.info(msg_str)
    insert=  f"""INSERT INTO {table_name} (telemetry_timestamp , device_nm , payload) VALUES ('{event_ts}','iot_test_device', '{msg_str}')"""

    return insert
  • Функция makeCreateTableStatement:

    Изменим выражение в соответсвтии с форматом таблицы.

def makeCreateTableStatement(table_name):

    statement = f"""CREATE TABLE {table_name} (
    ( telemetry_timestamp timestamp,
      device_nm varchar(200),
      payload varchar(2000)
    );
    """
    return statement
  • Функция msgHandler:

    • Изменим переменную event_id на event_ts (96 строка) и будем формировать ее следующим образом:

      event_ts = json_msg["event_metadata"]["created_at"]
    • Изменим значение переменной table_name на название нашей таблицы (98 строка):

      table_name = ‘telemetry_hist’
    • В функцию makeInsertStatement в качестве первого аргумента отправляем не event_id, а event_ts (99 строка):

      sql = makeInsertStatement(event_ts, event_payload, table_name)

Код с уже внесенными правками можно найти в этом гисте. Вставим его в файл index.py.

Обратите внимание, если вы использовали другое название таблицы для сохранения телеметрии, необходимо указать его в качестве значения переменной table_name на 79 строке.

Теперь, перед созданием версии функции, в форме под редактором необходимо заполнить параметры развертывания:

  • Точка входа: укажем index.msgHandler - это имя функции в файле index.py, которая будет вызываться в качестве обработчика. Указывается в формате <имя файла с функцией>.<имя обработчика>

  • Таймаут, с: 10 секунд -максимальное время выполнения функции. Облачная функция может выполняться не более 10 минут.

  • Память: 128 МБ - объем необходимой для функции памяти 

  • Сервисный аккаунт - укажем (или создадим, если его еще нет) сервисный аккаунт с ролями serverless.functions.invoker и editor

  • Переменные окружения - обратите внимание, разработанная функция использует 6 переменных окружения, которые необходимо заполнить в соответствующих полях. Нужно указать следующее:

    • VERBOSE_LOG — параметр, отвечающий за вывод подробной информации о выполнении функции. Введем значение True.

    • DB_HOSTNAME — имя хоста БД PostgreSQL для подключения.

    • DB_PORT — порт для подключения.

    • DB_NAME — имя базы данных для подключения.

    • DB_USER — имя пользователя для подключения.

    • DB_PASSWORD — пароль, который был введен при создании кластера

Все данные для подключения (кроме пароля) можно найти в обзоре развернутого Managed Service for PostgreSQL. 

Информация для подключения к БД
Информация для подключения к БД

В итоге мы получаем такое заполнение полей:

Настройки YCF.
Настройки YCF.

Создадим версию функции (кнопка Создать версию).

Настройка триггера IoT Core

Вернемся в раздел Cloud Functions и выберем подраздел “Триггеры”.

Подраздел "Триггеры" YCF.
Подраздел "Триггеры" YCF.

Создадим триггер (кнопка “Создать триггер”).

  • В блоке Базовые параметры:

    • В поле Имя введем имя триггера.

    • В поле Тип выберем Yandex IoT Core. Так мы укажем сервису, что будем работать с обработкой событий IoT Core. Кроме этого источника, для решения других задач можно обрабатывать сообщения Message Queue, события Object Storage, Container Registry, логов Cloud Logs, а также запускать функцию по таймеру.

  • В блоке Настройки сообщений Yandex IoT Core:

    • В поле Реестр введем iot_test-reg - реестр к которому привязано наше устройствою

    • В поле Устройство выберем Любое устройство (т.к. мы отправляем сообщения в топик реестра)

    • В поле Топик укажем топик, в который устройство отправляет данные:

      • $registries/<ID реестра>/events

Теперь триггер будет срабатывать при появлении новых данных в указанном топике.

  • В блоке Настройки функции:

    • Выберем функцию для обработки данных, созданную ранее (iot-core-handler).

    • В поле Тег версии укажем $latest - тогда триггер будет запускать последнюю развернутую .

    • В поле Сервисный аккаунт укажем созданный ранее сервисный аккаунт.

    • Остальные поля оставим пустыми.

Настройки триггера YCF.
Настройки триггера YCF.

Создадим триггер с заданными настройками.

Проверка работоспособности

Всё готово!

Теперь (если всё сделано правильно) каждое сообщение, отправляемое устройством, будет через созданный триггер инициировать вызов функции. Которая, в свою очередь, положит отправленный payload в таблицу на Postgres.

Проверим логи выполнения функции (Cloud Functions -> Функции -> iot-core-handler -> Логи).

Логи выполнения функции iot-core-handler.
Логи выполнения функции iot-core-handler.

Здесь отображаются сообщения, выводимые функцией в процессе работы, в том числе, сообщения об ошибках. Если сообщений об ошибках нет (а есть информационные сообщения, начинающиеся с [INFO]) - функция работает корректно.

Посмотрим теперь заполнение таблицы telemetry_hist в нашей БД (Managed Services for PostgresSQL -> telemetry_store -> SQL). Вводим имя пользователя и пароль и попадаем в редактор SQL.

Вводим простой запрос для получения выгрузки из таблицы

SELECT * FROM telemetry_hist

И видим всю историю отправленных пэйлоадов:

Заполнение таблицы telemetry_hist.
Заполнение таблицы telemetry_hist.

То есть всё работает: теперь телеметрия попадает в облако, накапливается в развернутой БД и доступна для дальнейшего анализа в любое удобное время из любой точки планеты!

PS. Если по каким-то причинам таблица остается пустой, можно проверить следующие моменты:

Отправило ли устройство хотя бы одно сообщение с момента деплоя функции и триггера?

  • Если нет - просто ждем пока это произойдет)

  • Если отправляло - смотрим логи выполнения функции.

    • Если логи пустые - стоит проверить триггер (в частности правильно ли указан топик и функция).

    • Если в логах ошибки - разбираемся с кодом.

    • Если функция отрабатывает и ошибок нет - проверяем настройки подключения к БД и имя таблицы.