Привет! Меня зовут Иван Банников, я ведущий разработчик VK Tech. В одном из проектов я создавал и развивал IoT-платформу и хочу теперь поделиться своим опытом. Архитектура IoT-платформ в какой-то мере похожа на архитектуру любой другой распределенной системы. Однако технология интернета вещей подразумевает взаимодействие не только в виртуальном, но и в физическом мире. Это влечет за собой ряд нестандартных для классических программных продуктов сложностей и нюансов. 

В этой статье мы поговорим о том, что представляют из себя современные IoT-платформы, что такое цифровые двойники и зачем они применяются, а также рассмотрим главные сложности и способы решения проблем при разработке IoT-решений. 

Основы построения IoT-систем


Итак, из чего же состоят базовые и продвинутые IoT-системы? Начнем с простого примера.

Возьмем обычный бытовой прибор — чайник, розетку, лампочку или мультиварку. Приделаем к нему модуль для управления по Bluetooth. Таким устройством можно будет управлять с телефона, планшета или компьютера. Можно, не вставая с дивана, менять освещение по всему дому, включать и выключать чайник, задавать режим работы мультиварки. Более того, при совместимости протоколов можно связывать один прибор с другим. Удобно, когда при срабатывании датчика двери в прихожей автоматически включается свет. Получается такая небольшая локальная IoT-система.



Но возможности такой системы ограничены. Мы не можем получить информацию о том, закрыта ли дверь, выключены ли розетки, находясь где-то далеко от дома. Управление устройствами возможно только с близкого расстояния. Также появляются проблемы с голосовым управлением: голосовым помощникам надо как-то подключаться к приборам через интернет, а приборы умеют взаимодействовать только по Bluetooth.



Необходим шлюз, который умеет подключаться к WiFi-сети домашнего роутера, связываться с сервером, обнаруживать приборы в Bluetooth-эфире, подключаться к ним и передавать команды, полученные от сервера. К серверу также может подключаться мобильное приложение или дополнительные сервисы, если пользователь хочет управлять техникой на дальней дистанции. В роли такого сервера выступает MQTT-брокер. Почему именно MQTT? Это легковесный протокол, созданный специально для IoT. С ним просто работать, и он достаточно популярен.

Таким образом, с появлением шлюза границы локальной IoT-системы выходят за пределы дома и умная техника начинает общаться уже с внешним миром. Помимо удаленного управления, через интернет появляется возможность сопрягать внешние сервисы с системой умного дома: можно подключить голосового помощника, сервис сценариев вроде IFTTT и многое другое.



Бизнес также может собирать и анализировать данные о пользовательских приборах и на основе этой информации принимать решения, например, о расширении линейки техники через интеграцию с другими IoT-платформами, допустим, с Tuya.

И наконец, чтобы IoT-платформа смогла взаимодействовать с пользователями и помогала бизнесу извлекать полезные знания, ее необходимо дополнить типичными для любой распределенной системы сервисами. Например, авторизация, аутентификация, мониторинг, аналитика, хранение и обработка пользовательских данных, система управления контентом, в которую, кстати, входит компонент, типичный для IoT — сервер хранения и распространения прошивок. Далее мы сосредоточимся только на том, что непосредственно относится к IoT.



Потенциальные трудности


Мы привыкли к тому, что в обычной жизни управление приборами — это просто. Нажал на кнопку — включил лампочку. Открыл или закрыл входную дверь — получил уведомление на телефон. Непосредственное взаимодействие с прибором нам привычно и понятно. Но когда в дело вступают сеть, удаленное управление и необходимость работать с большим количеством приборов и пользователей одновременно, все резко усложняется.

Чтобы начать управлять лампочкой по Bluetooth, нужно сперва к ней подключиться, затем получить актуальное состояние и только потом уже попытаться взаимодействовать, посылать команды. При этом Bluetooth-эфир может быть сильно зашумлён. 



Для управления через WiFi надо обнаружить нужный прибор или в ближней зоне (в сети роутера), или же на сервере, пройдя все этапы авторизации и аутентификации, получить его состояние и только потом передавать команды. Знать состояние прибора необходимо, чтобы с ним корректно взаимодействовать. Также в дело вмешивается сеть со своими задержками, обрывами, порчей пакетов, из-за чего порой нельзя сказать, что именно произошло: или лампочка перегорела, или роутер не работает. А если попытаемся управлять прибором по BLE через шлюз, то все эти проблемы накладываются друг на друга.



Интеграция со сторонними IoT-платформами приносит дополнительную головную боль: разница в протоколах, разные функциональные возможности приборов, различия в форматах, разные требования к работе прошивок. И все те же самые проблемы с сетью, не только с прибором, но еще и между облаками в межоблачной интеграции.  

Для мобильного приложения интеграция с несколькими IoT-платформами также является серьезной головной болью: нужно поддерживать новые артикулы, сторонние API, при необходимости подключать библиотеки, которые необходимы, чтобы связывать (paring) прибор с телефоном. Поэтому чем больше проблем будет решаться на стороне сервера, тем лучше. Еще один момент, важный для бизнеса: межоблачная интеграция платная, за каждый вызов стороннего API нужно платить, и немало.



Решение проблем с помощью цифровых двойников


Методом проб и ошибок, а также через общение backend-команды и команды мобильной разработки мы сформулировали основные принципы и подходы к решению подобных задач. Мы пришли к концепции цифровых двойников, которая позволила решить проблемы управления разнородными приборами и их интеграции между собой.

Серверные компоненты, работающие с цифровыми двойниками, реализованы на основе Tarantool. Со временем эти решения выросли в процессинговые сервисы, которые могут справляться с большим потоком телеметрии и запросов. 

Но что такое цифровой двойник? На первый взгляд кажется, что это всего лишь виртуальная модель физического прибора. Если это лампочка, то ее двойник — некоторая сущность с двумя состояниями: светится или нет. Если речь про чайник, то он может нагревать виртуальную воду или потихоньку остывать. Казалось бы, все просто. Но нет!

Вот мы наливаем в реальный чайник воду, ставим его на базу, включаем и ждем, когда он вскипит. Чтобы цифровой двойник этого чайника имел какой-то смысл, с ним должны происходить соответствующие изменения. Двойник должен «понимать», сколько в нем воды, какая у нее была изначальная температура при включении чайника и так далее. 

Задаем эти параметры и видим два кипящих чайника — реальный и виртуальный. И даже видим, что расчетное значение температуры близко к показаниям термодатчика настоящего прибора. Но внезапно чайник из виртуального мира сообщил, что он вскипятил всю воду и выключился. А чайник из нашего материального мира еще пыхтит. Или наоборот. Что делать? Как быть?



Налицо расхождение процессов. Причин тому великое множество: начиная от того, что мы учитываем не все аспекты физического процесса (атмосферное давление, температура воздуха на кухне, состав воды, изношенность чайника и так далее), и заканчивая тем, насколько точно мы воспроизвели виртуальную модель в коде и какие методы моделирования применили. Такой «доппельгангер», если он хорошо сделан, помогает сравнить реальные показатели с расчетными и сделать какие-то выводы. 

Но это только половина дела, потому что без знания состояния физического объекта эта модель в целом бесполезна. Пока что ее можно только запустить на компьютере, посмотреть и сказать: «Да, вот этот чайник на кухне устарел, потому что стал дольше греться, а значит, он уже изношен и скоро перегорит согласно сравнению с виртуальной моделью. Пора покупать новый!» То есть наше определение цифрового двойника не совсем корректное. 

Попробуем рассмотреть определение с другой стороны. Мы не будем ничего моделировать, а будем просто собирать и сохранять показания телеметрии. Оснастим чайник модулем, который передает показания датчиков (температуры, объема воды, состояния кнопок на панели) по какому нибудь каналу связи. Для лампочки достаточно передавать состояние выключателя, но также можно передавать и потребляемый ток. Таким образом, мы можем видеть, что нам передает прибор, и наблюдать за его состоянием. Эти числа можно хранить, можно просто отображать на экране ноутбука или телефона. 

Но и это не будет цифровым двойником. От пассивного наблюдения за показаниями датчиков в режиме реального времени толку мало, поскольку нет пока разницы, как мы получаем показания приборов, по Bluetooth или глазами. Решение, что делать, пока что принимаем мы.

А что, если объединить виртуальную модель с телеметрией? И поверх этого задать некоторые правила. Например, чайник передает, сколько в нем воды и начальную температуру, и запускает событие включения, а также может передавать дополнительную информацию — например, в каком режиме он работает. Цифровой чайник подхватывает эти параметры и сравнивает получаемые показания датчиков с расчетными и в случае большой разницы сообщает, что что-то пошло не так.



И наконец, научимся управлять физическим чайником по тому же Bluetooth-каналу, а цифровую модель обучим выдавать команды для управления чайником в зависимости от того, что происходит. Например, чайник все кипит и кипит, хотя он должен был выключиться уже минуту назад. Что делать? Попробуем выдать команду на выключение. Очень хорошо, если она выполнилась и температура начала снижаться. В противном случае выдадим тревожное сообщение пользователю, чтобы он прибежал на кухню и руками выключил чайник.

Таким образом, мы получили не просто виртуальную модель, а почти «вещь в себе». Не просто пассивный объект, хранящий телеметрию, но активно действующую и «думающую» сущность, которая общается с внешним миром и выполняет свои задачи.

В сухом остатке цифровой двойник — это объединение нескольких понятий:

  • моделируемое поведение;
  • наблюдения из реального мира;
  • правила, которые на основе телеметрии и моделируемого поведения позволяют  принимать решения;
  • интерфейс, через который можно взаимодействовать как с самим двойником, так и с управляемым им прибором.



То есть мы видим, что цифровой двойник скорее обладает собственным поведением и в строгом смысле не является двойником объекта. В его поведении есть и дополнительные элементы для работы при утере информации, на принятие решений в случае аномалий и так далее. Но этот термин уже устоялся, и поэтому мы будем применять его.

Какие особенности цифровых двойников нужно учитывать при разработке IoT-систем?


В первую очередь цифровые двойники должны работать быстро. Модель, которая с запозданием, «вяло» реагирует на поступающую телеметрию, особой пользы нам не принесет. Показания датчиков могут быть неактуальные, команды несвоевременные. А это значит, что код, который обрабатывает состояние модели и входящую телеметрию, должен работать быстро. Для этого необходимо еще и быстрое хранилище состояния. Таким хранилищем может выступать только оперативная память.

Далее необходимо уметь обрабатывать данные от многочисленных цифровых двойников. В больших системах количество устройств может достигать нескольких миллионов, а значит, нам необходимо уметь достаточно эффективно хранить о них информацию и быстро находить нужный нам экземпляр.

Состояние устройства может быть сложным, большим. При этом его необходимо обновлять согласованно, чтобы не было расхождений между поведением реального прибора и его двойника.

Кроме того, цифровой двойник в общем случае взаимодействует с физическим объектом через сеть. Сети ненадежны, а в случае с IoT иногда приходится дополнительно отказываться от каких-то гарантий доставки сообщений, что необходимо учитывать. Поскольку мы не можем точно сказать, что произошло на другом конце соединения, сеть часто становится слепой зоной для цифрового двойника. Словом, задача весьма сложная. Это междисцлинарная область, которая включает в себя и сетевое программирование, и физику, и знание особенностей техники, и так далее.

Итак, мы построили различные модели, научились принимать и обрабатывать телеметрию от приборов, научили цифрового двойника управлять приборами, а сами приборы — взаимодействовать через цифровых двойников. Что нам дает наличие такой системы?

Самая главная выгода — это способность системы корректно управлять приборами. Из этого вытекает много возможностей. Теперь, когда мы умеем работать с приборами через их представителей, связать воедино различные приборы и системы становится гораздо проще.  Двойники для разных приборов от разных производителей могут работать по разным протоколам (MQTT, ModBus, CoAP и так далее), применять различные стили кодирования состояния. Но если мы выстроили единый формат хранения и обработки цифровых двойников, то можем выстраивать унифицированные взаимодействия между разными приборами и их клиентскими сервисами (приложениями).

Кроме того, можно создать цифровой двойник не только для физического предмета, но и для сервиса. К примеру, сервис погоды можно легко представить в виде цифрового двойника, совместимого с другим цифровым двойником, описывающим метеостанцию.

Из каких элементов состоит процессинговая система на Tarantool




Разработка процессинговой системы с нуля — непростая задача. Нужно понимать, как работать с большими потоками данных, как система поведет себя в периоды пиковых нагрузок, как обеспечивать отказоустойчивость и масштабируемость. Также важно знать, какие данные ценные, а какие не так жалко потерять. 

В Tarantool и его экосистеме есть решения, такие как Cartridge, которые позволяют делать высоконагруженные надежные кластеры с помощью репликации, журнала упреждающей записи (WAL), удаленного вызова Lua-процедур на другом узле и многих других инструментов.

Для решения основной задачи — обработки сообщений и работы с цифровыми двойниками — сосредоточимся на структуре процессинговой системы. Обозначим основные элементы, совокупность которых позволяет достичь цели, сначала на примере Bluetooth-устройств, а затем на примере добавления Tuya. Это объемная тема, поэтому мы не будем отвлекаться на упомянутые выше нюансы и вопросы безопасности.

В общем виде система состоит из следующих элементов:

  1. Ядро — цифровые двойники. Набор обработчиков сообщений и процессов, которые реализуют различные виды таймеров.
  2. Источники. Это могут быть подключения к MQTT-брокеру (например, с использованием luamqtt), к Kafka, любые другие подключения, по которым поступают данные, или же просто Lua-процедуры, которые вызываются по IProto (бинарный протокол Tarantool) внешними клиентами.
  3. Набор входных буферов. В них из источников складываются данные.
  4. Набор выходных буферов. Туда ядром складываются сообщения, которые нужно отправить во внешний мир.
  5. Исполнители. Процессы (в терминологии Tarantool они называются файберами), которые забирают из выходных буферов данные и посылают их во внешний мир. Это также могут быть соединения, открытые в самом Tarantool, или же какие-то внешние процессы, которые забирают данные по IProto.
  6. Слой клиентского API для взаимодействия с цифровыми двойниками. Через этот API внешние клиенты (сервисы) управляют приборами, узнают об их состоянии, получают результаты каких-то операций.

Работа выглядит следующим образом:

  1. Из источника данные попадают во входной буфер.
  2. Далее их пачкой забирают нужные файберы и обрабатывают с помощью цифровых двойников. При обработке могут отправляться новые сообщения.
  3. Из выходного буфера сообщения выбираются исполнителем и отсылаются во внешний мир (команды, ответы).
  4. В ядре работают фоновые задачи, которые обрабатывают тайм-ауты, смену состояний цифровых двойников по времени.
  5. Клиентские сервисы могут обращаться через API и взаимодействовать с цифровыми двойниками, что тоже может приводить к генерации сообщений, которые будут отсылаться во внешний мир.

Рассмотрим подробнее два наиболее интересных компонента: буфер и ядро. Буфер интересен тем, что это важный элемент для взаимодействия между процессами ядра.

Входные и выходные буферы


Буфер позволяет процессинговой системе быстро принимать и обрабатывать большой поток сообщений за разумное время, переживать пиковые нагрузки, не останавливая при этом другие компоненты системы.

Какие могут быть реализации? Можно воспользоваться готовыми, например fiber.channel. С помощью него можно создать канал, в который один процесс (файбер) будет записывать сообщения, а другой — забирать. Какие есть недостатки? Канал ограничен по количеству сообщений и в момент пиковой нагрузки может быть заполнен. А когда канал заполнен, файбер, пытающийся записать в него сообщение, будет заблокирован до тех пор, пока не появится свободный слот в канале, что уже нежелательно. Это можно обойти: просто не принимать новые сообщения, пока канал не освободится.

Но есть две проблемы. Во-первых, если данные важные и не хочется их терять при перезагрузке или падении узла системы, то канал для этого не подходит. Он все держит в памяти, и данные могут быть потеряны. Во-вторых, канал не работает транзакционно. Если нужно откатить транзакцию, в рамках которой были обработаны сообщения, а потом это повторить, то придется думать, где сохранять уже прочитанные сообщения. Можно рассмотреть tarantool queue, и это надежный вариант. Он позволяет задать опции при создании очереди, и тогда данные будут сохраняться в WAL и при перезагрузке системы не пропадут. Также можно воспользоваться moonlibs/xqueue. Однако для работы в пакетном режиме, когда требуется читать сообщения из очереди пачками, их API не очень удобен. Придется сначала много раз делать take, а затем ack на каждое обработанное сообщение. Также архитектура этих очередей приспособлена к работе с одиночными сообщениями в ненадежных сетевых средах, когда исполнитель может пропасть, и тогда надо делать что-то с задачей, которая зависла в очереди.

Рассмотрим еще один вариант упрощенной реализации буфера. В его основе лежит обычный спейс (space, так в Tarantool называются таблицы), куда кладутся сообщения. Для каждого сообщения последовательно увеличиваем его id, который является ключевым полем. При вставке берем максимальный id с помощью индекса. При взятии сообщений начинаем с минимального id, который находится в спейсе в данный момент, и последовательно удаляем N сообщений, добавляя их в список, пока они есть. Для блокировки файбера, ждущего новые данные, когда буфер пустой, используется fiber.cond. При вставке новых сообщений в буфер делается cond:broadcast(). API получается следующим:

  • buffer.init(name, schema, opts) — создает спейс под буфер с именем name; schema — поля сообщения; opts — опции, которые нужно передать при создании спейса;
  • buffer_obj:put(messages) — вставляет сообщения в буфер;
  • buffer_obj:take(limit) — забирает из буфера сообщения, которых может быть от одного до limit;
  • buffer_obj:wait(timeout) — ожидание появления данных. Метод take является неблокирующим, чтобы его можно было вызвать внутри транзакции, в то время как метод wait можно вызывать и по его результатам решать, открывать ли транзакцию.

Поскольку полная реализация буфера небольшая, приведем её здесь:

local fiber = require('fiber')

local M = {}
local buffer_mt = {}

function buffer_mt:put(messages)
    local last = self.space.index.pk:max()
    local start = 0
    if last ~= nil then
        start = last.id
    end

    for i = 1, #messages do
        local message = messages[i]
        message.id = start + i
        local tuple, err = self.space:frommap(message)
        if err then
            return err
        end
        self.space:insert(tuple)
    end

    self.waiter:signal()
end

function buffer_mt:take(limit)
    local count = 0
    local messages = table.new(limit, 0)
    for _, tuple in self.space:pairs(0, { iterator = box.index.GE }) do
        count = count + 1
        messages[count] = self.space:delete({ tuple.id })
        if count >= limit then
            break
        end
    end

    return messages
end

function buffer_mt:wait(timeout)
    if self.space:len() ~= 0 then
        return true
    end

    return self.waiter:wait(timeout)
end

function M.create(name, schema, opts)
    opts = opts or {}
    local fields = { { name = 'id', type = 'unsigned' } }
    table.move(schema, 1, #schema, 2, fields)
    local space = box.schema.space.create(name, {
        engine = 'memtx',
        if_not_exists = true,
        temporary = opts.temporary or true,
        is_local = opts.is_local or false,
        format = fields
    })

    space:create_index('pk', {
        parts = { 'id' },
        type = 'tree',
        if_not_exists = true
    })

    return setmetatable({
        name = name,
        space = space,
        waiter = fiber.cond()
    }, { __index = buffer_mt })
end

return M

Почему буферов должно быть несколько? Почему бы не сделать один большой входной буфер и один большой выходной? Дело в том, что для сообщений разного вида из разных источников могут быть заданы разные требования по надежности доставки и обработки, а также разные приоритеты. Поэтому для каждого вида сообщений можно и нужно создавать отдельные буферы. Например, сообщения с результатами сканирования Bluetooth- или WiFi-эфира могут повторяться, приборы присылают их регулярно, а значит, их потеря не является критичной. При этом таких сообщений может быть очень много. Для них можно выставить параметры спейсов temporary=true и is_local=true, что существенно снизит нагрузку на диск (не надо писать WAL). Для иного рода сообщений будут другие требования.

Создаем цифровой двойник


А вот теперь самое интересное — цифровые двойники. Как уже ранее говорилось, их создание — дело непростое, и надо хорошо понимать конечную цель, что мы хотим от них получить. Как они выглядят в коде? Обычно это конечные автоматы, которые обрабатывают входные сообщения, посылают свои, отслеживают временные.

Точками унификации являются источники и исполнители (функции и фоновые задачи), которые обмениваются сообщениями с внешним миром, а также сами цифровые двойники, состояние которых и API работы с которыми может быть унифицировано. Управляющих сервисов можно создавать множество, на разные протоколы и IoT-платформы, с которыми необходимо интегрироваться.

Для того чтобы раскрыть основную идею, не отвлекаясь на различные сложности и тонкости реального мира, реализуем эмулятор простейшего шлюза Bluetooth-WiFi с виртуальной лампочкой и управляющую систему для них.

Пример с эмулятором нам покажет, что с точки зрения серверной системы все приборы, которыми она управляет, являются всего лишь сетевыми агентами, которые обмениваются сообщениями, и система в любом случае будет наблюдать только сетевое поведение. Прибор в реальности ведет себя гораздо сложнее: например, он может мигать лампочками, но в сеть отправлять только отчеты о своей температуре. Следовательно, можно (и нужно!) разрабатывать и тестировать процессинговые системы с использованием эмуляторов, что позволяет потом

проводить интеграционное и нагрузочное тестирование. Также при разработке реальных устройств это помогает оценить, какой будет трафик и нагрузка на платформу, а значит, грамотно спроектировать протоколы.

Эмулятор шлюза с виртуальной лампочкой


Шлюз работает следующим образом: он посылает IoT-платформе сообщение о своем состоянии («включен/выключен») и переходит в режиме ожидания команд. Список команд управления шлюзом:

  • scan — сканировать Bluetooth-эфир и на каждый обнаруженный прибор прислать сообщение. В начале и конце сканирования шлюзом отправляется сообщение scan, которое является маркером начала и окончания сканирования. При обнаружении прибора в эфире отправляется сообщение scan <device addrеss>;
  • connect <dеvice BLE address> — подключиться к BLE-прибору. При успешном подключении приходит маркер “OK”, при ошибке — маркер ошибки с описанием;
  • disconnect <dеvice BLE address> — отключиться от BLE-прибора;
  • request <dеvice BLE address>, <commаnd> — отправить команду подключенному  прибору и получить ответ.

Сообщения, которые шлюз может посылать:

  • status <dеvice | gatеway>, <statе> — сообщает о состоянии какого-либо прибора или самого шлюза;
  • scan <dеvice BLE>, <dеscription> — сообщение об обнаруженном приборе.

Будем считать, что проблем с Bluetooth нет, соединение надежное, как по проводу, из виртуальных приборов только одна лампочка, которую можно включать и выключать.

Выполним эмулятор как приложение на Tarantool. Весь код разместим в файле demu.lua (сокращение от DEvice EMUlator). Запускаться это будет на отдельном экземпляре, на другом порте, а также нужно складывать WAL и снимки состояния в отдельную директорию.

box.cfg {
    listen = '3302',
    wal_dir = 'demu',
    memtx_dir = 'demu'
}

Обмен сообщениями


Эмулятор должен обмениваться сообщениями c IoT-платформой, а значит, воспользуемся той же реализацией буфера/очереди для входных и выходных сообщений. Каждое сообщение будет иметь два поля: message — собственно название сообщения или команды, это обязательный атрибут, и args — массив параметров сообщения, который является необязательным. Таким образом, буферы для обмена сообщениями будут выглядеть вот так:

local buffer = require('buf')

local output = buffer.create('to_net', {
    { name = 'message', type = 'string', is_nullable = false },
    { name = 'args', type = 'array', is_nullable = true }
})

local input = buffer.create('from_net', {
    { name = 'message', type = 'string', is_nullable = false },
    { name = 'args', type = 'array', is_nullable = true }
})

Теперь нужно это все немного оживить. Добавим глобальную функцию push_messages, чтобы её потом вызывала управляющая система, если она хочет передать эмулятору управляющее сообщение:

function push_messages(messages)
    box.atomic(input.put, input, messages)
end

И для начала реализуем простую логику: то, что записано во входной буфер, немедленно оказывается в выходном. Для этого нужен файбер. Файберы можно создавать и задавать им имя, что позволяет потом смотреть в логах, какой файбер что сделал. Воспользуемся простой оберткой:

local fiber = require('fiber')
local log = require('log')

local function task(name, fn)
    local proc = fiber.create(function()
        while true do
            fn()
        end
    end)
    proc:name(name)
    log.info('created proc %s with fid = %s', name, proc:id())

    return proc
end

С помощью этой обертки создадим две фоновые задачи: одна для реализации echo-логики, другая для распечатки того, что попало в выходной буфер:

local function process_input()
    local batch = input:take(100)
    for i = 1, #batch do
        local message = batch[i]
        log.info('message from IoT platform: %s', message)
        output:put({{
            message = message.message,
            args = message.args
        }})
    end
end

task('input processor', function()
    if input:wait(10) then
        box.atomic(process_input)
    end
end)

local json = require('json')

task('output processor', function()
    if output:wait(10) then
        local batch = box.atomic(output.take, output, 100)
        log.info('messages to IoT platform: %s', json.encode(batch))
    end
end)

Теперь это нужно протестировать. Создадим директорию для WAL и снимков состояния:

mkdir demu

И запустим наше приложение с эмулятором:

tarantool -i demu.lua

Параметр -i позволяет нам запустить Tarantool и перевести его в интерактивный режим. При успешном запуске мы должны в консоли увидеть примерно следующее:

2022-12-19 09:50:59.325 [8630] main/103/demu2.lua I> created proc input processor with fid = 118
2022-12-19 09:50:59.325 [8630] main/103/demu2.lua I> created proc output processor with fid = 119
2022-12-19 09:50:59.325 [8630] main/103/demu2.lua C> Tarantool 2.8.4-0-g47e6bd362
type 'help' for interactive help

Теперь попробуем это протестировать, вызовем в консоли функцию push_messages:

tarantool> push_messages({{message = 'echo'}})

И в консоли должны распечататься логи:

tarantool> 2022-12-19 09:52:47.390 [8630] main/118/input processor I> message from IoT platform: [1, 'echo']
2022-12-19 09:52:47.390 [8630] main/119/output processor I> messages to IoT platform: [[1,"echo"]]

Мы научили эмулятор общаться с внешним миром и реализовали простую логику. В дальнейшем заменим логику функции process_input и логику output processor.

Хранение и обработка состояний


Далее потребуется несколько спейсов, чтобы хранить состояния моделируемых приборов:

  • gateways — хранит состояния шлюзов;
  • devices — хранит состояния приборов;
  • ether — виртуальная модель Bluetooth-эфира, содержит пары идентификаторов {шлюз, прибор}, а также дополнительный признак, установлено ли между шлюзом и прибором соединение.

Начнём со шлюзов. Определим сразу все состояния шлюза и обозначим их с помощью констант:

local GW_WAIT = 0       — ожидание команд
local GW_SCANNING = 1   — сканирование BT-эфира
local GW_CONNECTING = 2 — подключение к BLE-прибору

Создадим спейс gateways и индексы к нему:

local gateways = box.schema.space.create('gateways', {
    if_not_exists = true,
    engine = 'memtx',
    format = {
        { name = 'address', type = 'string', is_nullable = false },
        { name = 'online', type = 'boolean', is_nullable = false },
        { name = 'status', type = 'number', is_nullable = true },
        { name = 'state', type = '*', is_nullable = true },
        { name = 'updated', type = 'number', is_nullable = true }
    }
})
gateways:create_index('pk', {
    type = 'tree',
    if_not_exists = true,
    parts = { 'address' }
})
gateways:create_index('pulse', {
    type = 'tree',
    if_not_exists = true,
    unique = false,
    parts = { 'online', 'status', 'updated' }
})

Зачем нужны именно такие поля и индексы?

  • поле address — идентификатор шлюза. В IoT-мире это может быть что угодно, начиная от идентификатора чипа от производителя и заканчивая заданным идентификатором от самой компании. Главное требование — уникальность;
  • поле online — описывает, находится ли шлюз в сети; 
  • поле status — описывает, в каком сейчас состоянии находится шлюз (например, прием команд, подключение к прибору). Чтобы не тратить много места, это поле можно сделать числовым и каждой числовой константе назначить свою семантику состояния. Это поле можно сделать nullable, так как в offline-состоянии (когда шлюз не в сети) нам его состояние неизвестно;
  • поле state типа any содержит некоторую информацию о состоянии. Эти данные можно хранить и в нескольких полях, у каждого из них в кортеже будет свое назначение, но здесь просто возьмём одно поле для упрощения кода;
  • поле updated — время последнего изменения состояния. По этому полю можно отслеживать временные переходы. Для эмулятора это способ изобразить задержки в системе. Как это делается, опишем ниже. Примерная точность — до миллисекунд;
  • индекс pk — это обязательный primary-индекс, он должен быть первым в списке индексов. Поле address является ключом;
  • индекс pulse является составным, и с его помощью можно реализовывать выборки по сложному критерию, чтобы обрабатывать временные события. Например, можно получить выборку всех шлюзов, которые отправили последнее сообщение сканирования эфира некоторое время назад, и что-то с этой выборкой сделать — допустим, завершить процесс сканирования или же отправить еще одно сообщение сканирования.

Создадим также спейсы и индексы для приборов и эфира:

local devices = box.schema.space.create('devices', {
    if_not_exists = true,
    engine = 'memtx',
    format = {
        { name = 'address', type = 'string', is_nullable = false },
        { name = 'switch', type = 'boolean', is_nullable = false }
    }
})
devices:create_index('pk', {
    type = 'tree',
    if_not_exists = true,
    parts = { 'address' }
})

local ether = box.schema.space.create('ether', {
    if_not_exists = true,
    engine = 'memtx',
    format = {
        { name = 'gateway', type = 'string', is_nullable = false },
        { name = 'device', type = 'string', is_nullable = false },
        { name = 'connected', type = 'boolean', is_nullable = false }
    }
})
ether:create_index('pk', {
    type = 'tree',
    if_not_exists = true,
    parts = { 'gateway', 'device' }
})

Спейс devices просто хранит два поля — идентификатор address и двоичное состояние виртуальной лампочки: включена или выключена. В спейсе ether моделируется видимость Bluetooth-эфира. Если есть запись для шлюзов GATE-X и BULB-Y, то считаем, что этот шлюз видит в эфире эту лампочку. Если же атрибут connected установлен в true, то считаем, что шлюз установил соединение с этой лампочкой по Bluetooth.

Состояния, которые хранятся в спейсах, сейчас безжизненны — это просто данные. Необходимо добавить жизни. Если «сотворить» виртуальный шлюз можно просто с помощью вызова функции box.space.gateways.insert({ <addrеss>, false }), то «включить» его или «выключить» можно с помощью следующей функции:

function switch_gateway(address, flag)
    local gateway = box.space.gateways:get(address)
    if gateway.online == flag then
        return false
    end

    local next_status, next_state
    if flag then
        next_status = GW_WAIT
        next_state = box.NULL
    else
        next_status = box.NULL
        next_state = box.NULL
    end

    return box.atomic(function()
        box.space.gateways:update(address, {
            { '=', 'online', flag },
            { '=', 'status', next_status },
            { '=', 'state', next_state }
        })
        output:put({{
            message = 'status',
            args = { 'gateway', address, { online = flag } }
        }})
        for _, pair in box.space.ether:pairs({ address }) do
            box.space.ether:update(
                { pair.gateway, pair.device },
                {{ '=', 'connected', false }}
            )
        end
        return true
    end)
end

К этому коду есть следующие пояснения:

  1. Глобальная функция switch_gateway будет вызываться по IProto кем-нибудь, кто управляет эмулятором. Сразу оговоримся, что это не управляющая система! Не забудьте, что это именно эмулятор физического прибора, а значит, им управляет что-то другое, имитируя события реального мира.
  2. При «включении/выключении» меняем состояние шлюза, а затем посылаем в мир сообщение со статусом (появился или исчез из сети).

Итак, шлюз включается и выключается и сообщает об этом внешнему миру. Протестируем это. Запустим приложение как обычно, а затем вызовем в консоли следующие функции:

tarantool> box.space.gateways:insert({ 'GATE-X', false })
tarantool> switch_gateway('GATE-X', true)

После вызова функции switch_gateway увидим, что эмулятор успешно отчитался о включении шлюза:

2022-12-19 10:12:07.589 [10576] main/119/output processor I> messages to IoT platform: [[1,"status",["gateway",{"online":true}]]]

Проверим, что состояние шлюза действительно поменялось:

tarantool> box.space.gateways:get('GATE-X')
---
- ['GATE-X', true, 0, null]
...

Работает. Теперь нужно научить шлюз принимать и обрабатывать команды из внешнего мира. У нас уже имеется задача для обработки входящих сообщений. Нужно расширить её логику, чтобы для входящих сообщений находился нужный обработчик и вызывался с данными из сообщения. Для этого объявим таблицу handlers и будем в ней размещать необходимые функции. А находить их станем по значению ключа message из сообщения.

Реализуем для начала команду scan:

local clock = require('clock')

local function now()
    return math.floor(clock.realtime() * 1000)
end

local handlers = {}

function handlers.scan(address)
    local gate = box.space.gateways:get(address)
    if not gate or not gate.online then
        return
    end
    if gate.status ~= GW_WAIT then
        return output:put({{
            message = 'scan',
            args = { address, box.NULL, 'BUSY' }
        }})
    end

    box.space.gateways:update(address, {
        { '=', 'status', GW_SCANNING },
        { '=', 'state', {} },
        { '=', 'updated', now() }
    })
end

Этот обработчик проверяет шлюз (его наличие в спейсе) и состояние. Если шлюз не найден, то обработка завершается (нет ответа); а если шлюз находится в неподходящем для обработки команды состоянии, то посылается сообщение с ошибкой. В случае, когда все хорошо, шлюз переводится в состояние GW_SCANNING. Важно отметить, что также обновляется поле updated, оно сыграет потом большую роль, когда нужно будет реализовать события во времени.

И новая реализация process_input:

local function process_input()
    local batch = input:take(100)
    for i = 1, #batch do
        local message = batch[i]
        log.info('message from IoT platform: %s', message)
        local handler = handlers[message.message]
        if handler ~= nil then
            handler(unpack(message.args))
        end
    end
end

Протестируем, что команда scan, посланная шлюзу, корректно обрабатывается и шлюз переходит в состояние сканирования. Сохраним изменения в коде, перезапустим приложение. Поскольку состояние сохранилось либо в WAL, либо в снимке, то ничего уже повторять со шлюзом не надо:

tarantool> push_messages({{ message = 'scan', args = { 'GATE-X' }}})

И после этого в логах должны появиться сообщения:

tarantool> 2022-12-19 10:52:44.453 [13600] main/118/input processor I> message from IoT platform: [1, 'scan', ['GATE-X']]

Проверим состояние шлюза:

tarantool> box.space.gateways:get('GATE-X')
---
- ['GATE-X', true, 1, [], 1671483739134]
...

Единичка в поле status говорит нам о том, что шлюз в состоянии сканирования, и также видим, когда его перевели в это состояние.

Теперь вопрос: как реализовать эмуляцию сканирования? Можно, конечно, сразу в ответ на команду scan послать весь список приборов из спейса ether, но это не будет моделью, приближенной к реальному процессу сканирования эфира. Гораздо интереснее, когда эфир сканируется постепенно. 

Вариант реализации в лоб — это запускать по файберу на каждый шлюз, где можно выполнять fiber.sleep(delay), проходить по спейсу ether и записывать в выходной буфер сообщения. Но такой подход неудобен тем, что много файберов создать не получится. Другой подход заключается в сканировании индекса pulse спейса gateways. Идея очень простая: с помощью такого индекса можно выбрать все шлюзы в сети, которые находятся в определенном состоянии с некоторого момента времени. По этой выборке можно пройти и выполнить требуемые действия, в том числе и сменить состояния. Назовем такой процесс обработкой фаз. Вот реализация сканирования по фазам:

local function process_gateway_phase(phase, delay, fn)
    local ts = now() - delay * 1000
    local processed = 0
    repeat
        processed = box.atomic(function()
            return box.space.gateways.index.pulse
                :pairs({ true, phase, ts }, { iterator = 'LE' })
                :take(function(gateway)
                     return gateway.online and gateway.status == phase
                 end)
                :take(100)
                :map(fn)
                :length()
        end)
        if processed > 0 then
            log.info('phase %s, processed: %s', phase, processed)
        end
    until processed == 0
end

Функция process_gateway_phase выбирает из спейса до 100 записей, вызывает на каждую из них функцию с бизнес-логикой, оборачивая это все в транзакцию. Ограничивать количество записей за раз нужно, так как Tarantool — система с кооперативной многозадачностью и каждая задача должна уметь самостоятельно разбивать свою работу на кусочки, чтобы не мешать остальным.

Добавим еще вспомогательную функцию (перевод шлюза в состояние ожидания):

local function gateway_to_waiting(gateway)
    box.space.gateways:update(gateway.address, {
        { '=', 'state', box.NULL },
        { '=', 'status', GW_WAIT },
        { '=', 'updated', now() }
    })
end

Теперь, когда есть алгоритм, реализующий сканирование индекса и выдающий выборку всех записей, находящихся в определенной фазе и у которых истекло время ожидания, можем реализовать логику самого сканирования, точнее его эмуляции.

Для одной записи шлюза нам необходимо прочитать из спейса ether связанные записи, начиная с последней зафиксированной позиции. Это возможно, так как индексы упорядочены и всегда можно прочитать ключи, которые следуют за указанным ключом в определенном порядке. Если же больше записей нет, то шлюз переводится в состояние ожидания команд и больше не появляется в выборке по этой фазе. Если же есть следующая запись, то шлюз остается в текущей фазе, обновляется лишь время ожидания, а последний прочитанный прибор из спейса ether записывается в поле state кортежа.

local function gateway_scan(gateway)
    local last_device = gateway.state.last_scan
    local key = { gateway.address, last_device or '' }
    local scanned = box.space.ether:pairs(key, { iterator = 'GT' }):nth(1)
    if scanned == nil then
        gateway_to_waiting(gateway)
        return output:put({{
            message = 'scan',
            args = { gateway.address, box.NULL, 'OK' }
        }})
    end
    local device = box.space.devices:get(scanned.device)
    if device ~= nil then
        output:put({{
            message = 'scan',
            args = { gateway.address, device.address, 'OK' }
        }})
    end
    box.space.gateways:update(gateway.address, {
        { '=', 'updated', now() },
        { '=', 'state', { last_scan = scanned.device } }
    })
end

local SCAN_DELAY = 2
task('scanner', function()
    process_gateway_phase(GW_SCANNING, SCAN_DELAY, gateway_scan)
    fiber.sleep(0.001)
end)

Убедимся, что процесс сканирования исправно имитируется:

tarantool> box.space.devices:insert({'BULB-Y', false})
tarantool> box.space.devices:insert({'BULB-Z', false})
tarantool> box.space.ether:insert({'GATE-X', 'BULB-Z', false})
tarantool> box.space.ether:insert({'GATE-X', 'BULB-Y', false})
tarantool> push_messages({{message = 'scan', args = {'GATE-X'}}})

В случае успеха увидим логи:

tarantool> 2022-12-20 00:29:24.726 [8386] main/118/input processor I> message from IoT platform: [1, 'scan', ['GATE-X']]
2022-12-20 00:29:26.726 [8386] main/119/output processor I> messages to IoT platform: [[1,"scan",["GATE-X","BULB-Y","OK"]]]
2022-12-20 00:29:26.726 [8386] main/120/scanner I> phase 1, processed: 1
2022-12-20 00:29:28.727 [8386] main/119/output processor I> messages to IoT platform: [[1,"scan",["GATE-X","BULB-Z","OK"]]]
2022-12-20 00:29:28.727 [8386] main/120/scanner I> phase 1, processed: 1
2022-12-20 00:29:30.727 [8386] main/119/output processor I> messages to IoT platform: [[1,"scan",["GATE-X",null,"OK"]]]
2022-12-20 00:29:30.727 [8386] main/120/scanner I> phase 1, processed: 1

Убедимся, что по завершении процесса сканирования шлюз переходит в корректное состояние:

tarantool> box.space.gateways:get('GATE-X')
---
- ['GATE-X', true, 0, null, 1671485370725]
...

Другие обработчики команд и фаз


Аналогичным образом делаются другие обработчики команд и фоновые задачи для обработки состояний. Обработчик для команды connect просто переводит шлюз в фазу GW_CONNECTING, и далее его подхватывает файбер, который обрабатывает записи только этой фазы.

function handlers.connect(gateway, device)
    local gate = box.space.gateways:get(gateway)
    if gate == nil then
        return output:put({
            message = 'connect',
            args = { gateway, device, 'NO_GATEWAY' }
        })
    end

    if gate.status ~= GW_WAIT then
        return output:put({
            message = 'connect',
            args = { gateway, device, 'BUSY' }
        })
    end

    box.space.gateways:update(gateway, {
        { '=', 'status', GW_CONNECTING },
        { '=', 'state', { connecting = device }},
        { '=', 'updated', now() }
    })
end

Фоновая задача для обработки фазы GW_CONNECTING:

local function gateway_connect(gateway)
    local device = gateway.state.connecting
    box.space.ether:update(
        { gateway.address, device },
        {{ '=', 'connected', true }}
    )
    gateway_to_waiting(gateway)
    output:put({{
        message = 'connect',
        args = { gateway.address, device, 'OK' }
    }})
end

local CONNECT_DELAY = 2
task('connector', function()
    process_gateway_phase(GW_CONNECTING, CONNECT_DELAY, gateway_connect)
    fiber.sleep(0.001)
end)

Обработчик для команды disconnect:

function handlers.disconnect(gateway, device)
    local gate = box.space.gateways:get(gateway)
    if gate == nil then
        output:put({
            message = 'disconnect',
            args = { gateway, 'NO_GATEWAY' }
        })
        return
    end

    if gate.status ~= GW_WAIT then
        output:put({
            message = 'disconnect',
            args = { gateway, 'BUSY' }
        })
        return
    end

    box.space.ether:update(
        { gateway, device },
        {{ '=', 'connected', false }}
    )
    output:put({{
        message = 'disconnect',
        args = { gateway, device, 'OK' }
    }})
end

Обработчик для команды request:

function handlers.request(gateway, device, command)
    local pair = box.space.ether:get({ gateway, device })
    if not pair.connected then
        return output:put({{
            message = 'command',
            args = { gateway, device, 'DISCONNECTED' }
        }})
    end

    local reply
    if command == 'switch_on' then
        box.space.devices:update(device, {{ '=', 'switch', true }})
        reply = { switch = true }
    elseif command == 'switch_off' then
        box.space.devices:update(device, {{ '=', 'switch', false }})
        reply = { switch = false }
    elseif command == 'get_state' then
        local unit = box.space.devices:get(device)
        reply = { switch = unit.switch }
    end
    output:put({{ message = 'command', args = { gateway, device, reply } }})
end

Таким образом, с помощью отдельных файберов, которые захватывают записи с определенным статусом и с меткой времени, равной или меньшей указанному, а также с помощью обработчиков реализован простейший эмулятор. Последнее действие: заменить распечатку получаемых сообщений в output processor на вызовы netbox.call, которые будут посылать сообщения управляющей системе.

Как протестировать остальные части такого эмулятора — оставим в качестве упражнения для читателя :)

Цифровой двойник для эмулятора


Каким образом будет выглядеть система с цифровым двойником для нашего эмулятора? Как ни странно, цифровой двойник создается точно так же, как и сам эмулятор, код очень похожий.

Начинаем с настройки экземпляра Tarantool, весь код сохраняем в файле ditwin.lua (сокращение от Digital Twin):

box.cfg {
    listen = '3301',
    wal_dir = 'ditwin',
    memtx_dir = 'ditwin'
}

Остальной код практически полностью идентичен: аналогичным образом создаем буферы для обмена сообщениями (input и output), спейсы для хранения состояний по устройствам и вспомогательные функции. Различие будет только в логике, наборе обработчиков и семантике спейсов:

  • gateways — хранит в себе наблюдаемое состояние шлюза и состояние управления (время последнего запроса к шлюзу и так далее);
  • devices — хранит в себе наблюдаемое состояние прибора и состояние управления;
  • ether — хранит в себе список обнаруженных приборов и состояние подключений.

По составу атрибутов спейсы сделаем полностью аналогичными спейсам в эмуляторе.

Также аналогичен input processor, и точно так же будет выглядеть output processor.

Теперь сосредоточимся на обработчиках сообщений и фаз.

Обработка сообщений статуса


Когда в IoT-платформу попадает сообщение о том, что шлюз включился, платформа должна зарегистрировать его и сразу послать команду на сканирование эфира. Если пользователь системы не добавил шлюз и связанные с ним устройства в спейс, то все сообщения игнорируются.

function handlers.status(device_type, address, state)
    if
        device_type ~= 'gateway' or
        not box.space.gateways:get(address) or
        description.online == nil
    then
        return
    end

    local status, state, updated
    if description.online then
        status = GW_SCANNING
        state = {}
        updated = now()
    else
        status = box.NULL
        state = box.NULL
        updated = box.NULL
    end

    box.space.gateways:update(address, {
        { '=', 'online', description.online },
        { '=', 'status', status },
        { '=', 'state', {} },
        { '=', 'updated', updated }
    })

    if description.online then
        output:put({{
            message = 'scan',
            args = { address }
        }})
    end
end

Обработка сообщений сканирования


Пока шлюз не прислал пустое сообщение scan, которое означает успешное завершение сканирования, все обнаруженные приборы помещаются в спейс ether. После успешного завершения сканирования шлюз переходит в состояние GW_CONNECTING и посылает запрос на подключение к первому обнаруженному прибору из спейса ether.  В случае сообщения с ошибкой сканирования все накопленные результаты удаляются.

local function connect_next_device(gateway)
    local prev_device = gateway.state.last_connect or ''
    local next_device = box.space.ether
        :pairs({ gateway, prev_device }, { iterator = 'GT' })
        :nth(1)

    if next_device == nil or next_device.gateway ~= gateway then
        gateway_to_waiting(gateway)
        return
    end

    output:put({{
        message = 'connect',
        args = { gateway, next_device.device }
    }})
    box.space.gateways:update(gateway, {
        { '=', 'updated', now() },
        { '=', 'state', { last_connect = next_device.device } }
    })
end

function handlers.scan(gateway, device, result)
    local gate = box.space.gateways:get(gateway)
    if gate == nil or gate.state ~= GW_SCANNING then
        return
    end

    if device == nil then
        local next_status
        if result ~= 'OK' then
            box.space.ether:pairs({gateway}):each(function(item)
                box.space.ether:delete({item.gateway, item.device})
            end)
            next_status = GW_WAIT
        else
            next_status = GW_CONNECTING
        end
        box.space.gateways:update(gateway, {
            { '=', 'status', next_status },
            { '=', 'updated', now() }
        })
        if next_status == GW_CONNECTING then
            connect_next_device(gateway)
        end
        return
    end
    if not box.space.devices:get(device) then
        return
    end

    box.space.ether:insert({ gateway, device, false })
    box.space.gateways:update(gateway, {
        { '=', 'status', GW_SCANNING },
        { '=', 'updated', now() }
    })
end

Обработка подключений


Если пришло сообщение об успешном соединении, выбирается следующий прибор для подключения из списка результатов сканирования.

function handlers.connect(gateway, device, result)
    local gate = box.space.gateways:get(gateway)
    local dev = box.space.gateways:get(device)
    if gate == nil or device == nil or gate.status ~= GW_CONNECTING then
        return
    end

    if result ~= 'OK' then
        gateway_to_waiting(gate)
        return
    end

    box.space.ether:update({gateway, device}, {{'=', 'connected', true}})
    connect_next_device(gate)
end

Обработка задержек, отправка и ожидание команд


Будем считать, что если шлюз не получил сообщение какого-либо типа вовремя, то это прерывает любой процесс (ожидание результатов сканирования, подключение к приборам) и принудительно переводит шлюз в режим ожидания.

Реализация таймеров делается с помощью той же функции process_gateway_phase. Отправка команд и их ожидание выполняется по тем же принципам, точную реализацию функций возложим на читателя :)

Связываем вместе эмулятор и управляющий сервис


В Tarantool есть пользователь guest, который там находится по умолчанию; воспользуемся им и назначим ему права суперпользователя. В начале файлов demu.lua и ditwin.lua, но после секции с box.cfg добавим следующий код:

pcall(function()
    box.schema.user.grant('guest', 'super')
end)

И после этого остается только добавить такой код в оба модуля (нужно только не забыть правильно указать номера портов для эмулятора и управляющего модуля):

local connection

local function get_connection()
    if connection == nil then
        connection = net.connect('127.0.0.1:3301', { user = 'guest' })
    end

    return connection
end

Заменим реализацию output processor:

task('output processor', function()
    if output:wait(10) then
        local batch = box.atomic(output.take, output, 100)
        log.info('messages to IoT platform: %s', json.encode(batch))
        local connection = get_connection()
        local formatted_batch = {}
        for _, message in ipairs(batch) do
            table.insert(formatted_batch, message:tomap({names_only = true}))
        end
        connection:call('push_messages', { formatted_batch })
    end
end)

И с этим макетом системы можно поиграть: добавить шлюзы и устройства по обе стороны, включать и выключать, наблюдать за подключением и сканированием эфира, передачей команд и прочими веселыми штуками.

Интеграция с Tuya-платформой


Изучив базовые принципы создания сервисов, которые реализуют управление с помощью цифровых двойников, посмотрим на то, как это выглядит с точки зрения подключения сторонней платформы IoT.  Как уже было сказано, управляющий сервис может выступать адаптером для разных протоколов и платформы, но в случае с Tuya есть некоторые тонкости.

Основное управление приборами может происходить и через API самой Tuya, поэтому точку унификации команд и состояний цифровых двойников приходится выносить еще на уровень API-сервиса, через который пользователь взаимодействует с основной платформой и интегрируемой, но состояние цифровых двойников также может быть унифицировано на уровне Tarantool.

Еще один момент: в Tuya любой вызов API и получение сообщения из брокера в облаке Tuya стоят денег. Соответственно, к управляющей функции добавляется функция кэширования, то есть Tarantool в данном случае выступает и как кэш, чтобы по возможности не делать лишние вызовы, если можно получить данные из кэша.

Также к управляющей и кэширующей функции добавляется функция мониторинга. Несмотря на то, что Tuya унифицировала свои протоколы, железки все же могут вносить свои изменения в функциональности прошивок, что зачастую приводит к тому, что прибор, произведенный каким-то заводом, спамит сообщениями, за которые нужно платить. Поэтому на основе Tarantool и цифровых двойников осуществляется мониторинг и аналитика с целью отлова и отключения недобросовестных приборов.

Конечная схема интеграции Tuya на основе Tarantool выглядела следующим образом:

  1. Сервис с цифровыми двойниками, обрабатывающий поступающие от облака Tuya состояния приборов и трансформирующий их в единый формат. Также этот сервис дает API для формирования команд, которые потом отправляются в API Tuya.
  2. Сервис, подключающийся к Pulsar-брокеру на стороне Tuya и перекладывающий в сервис с цифровыми двойниками.
  3. Сервис API, который часть запросов преобразовывает и прокидывает в API Tuya, а часть запросов направляет в систему с цифровыми двойниками.

Таким образом, применение архитектуры с цифровыми двойниками помогает интегрировать гораздо меньшими усилиями.

Выводы, и о чем мы не успели поговорить в рамках статьи


Вывод первый: создавать виртуальные двойники на основе Tarantool достаточно быстро и удобно, так как Tarantool сочетает в себе свойства сервера приложений и базы данных.

Вывод второй: Tarantool хорош как интеграционное решение, связующее звено между различными сервисами, так как может хранить и быстро обрабатывать большие объемы информации.

Мы сознательно не рассматривали вопросы кластеризации и шардирования, но держали это в уме. Проблема шардирования в разрезе пользователей и приборов заключается в том, что шардировать только по пользователям нельзя, так как приборы могут управляться несколькими пользователями, и, соответственно, группа приборов и пользователей должна попадать на один и тот же шард.

Мы не рассматривали вопросы обмена командами и ответами, оставив это читателю, поскольку это увеличило бы статью, но основной принцип был уже рассмотрен на примере конечных автоматов с обработкой фаз с помощью фоновых файберов. Тем не менее работа с командами и ответами от приборов — это тоже достаточно обширная тема.

И наконец, фоновые задачи, которые обрабатывают записи в определенных фазах, используют fiber.sleep(T), что снижает точность таймеров. Это тоже намеренное упрощение. Повышение точности подобных таймеров выходит за рамки статьи.

Спасибо, что дочитали до конца :)

Скачать Tarantool можно на официальном сайте, а получить помощь — в Telegram-чате.

Комментарии (0)