Умные устройства окружают нас повседневно и не только в быту: датчики, бытовые приборы, лампочки, розетки и другая техника. Каждый день мы сталкиваемся с более новыми и умными устройствами, управляемые через интернет или Wi-Fi.

IoT (Internet of Things) в переводе означает интернет умных вещей. Это концепция, объединяющая физические устройства в одну сеть для передачи данных и управления ими. И оказывается, что интернет вещей — никакое не ограничение! Управлять устройствами в сети можно с помощью легковесного протокола MQTT.

Привет, Хабр! Меня зовут Александр Чередников и я — CTO в компании QTIM, которая занимается заказной разработкой. В этой статье по мотивам моего доклада на PHP Russia расскажу, как общаться с умными устройствами силами PHP.

MQTT и некоторые его особенности

MQTT — это легковесный протокол общения, созданный специально для устройств с ограниченными возможностями и низкой пропускной способностью. Разберём некоторые его особенности. 

Брокер MQTT

Брокер MQTT — это центральный узел, через который клиенты могут обмениваться сообщениями.

Брокеры MQTT:

  • Eclipse Mosquitto;

  • EMQ;

  • HiveMQ;

  • NanoMQ;

  • VerneMQ;

  • RabbitMQ с плагином MQTT.

На практике мне пришлось поработать с Eclipse Mosquitto. Выбрали его по нескольким причинам: он легковесный, прост в настройке, очень быстрый и позволяет передавать сообщения мгновенно. Про плюсы и минусы остальных можно почитать в документации и подобрать под свои нужды.

Сообщения

Сообщения в MQTT передаются в бинарном виде, что сокращает память и уменьшает время на передачу сообщения.

Есть разные виды сообщений от умных устройств:

  • JSON;

  • Protobuf;

  • XML;

  • CBOR;

  • Text.

Благо, с этим всем PHP умеет работать.

Типы пакетов

При передаче сообщений через брокер к умным устройствам в сообщении используются различные типы пакетов:

  • CONNECT — установка соединения с брокером.

  • CONNACK — подтверждение соединения.

  • PUBLISH — отправка сообщения в определённый топик.

  • PUBACK, PUBREC, PUBREL, PUBCOMP — подтверждения доставки для QoS 1 и QoS 2.

  • SUBSCRIBE — запрос на подписку на топик.

  • SUBACK — подтверждение подписки.

  • UNSUBSCRIBE — отмена подписки.

  • UNSUBACK — подтверждение отмены подписки.

  • PINGREQ и PINGRESP — проверка соединения.

  • DISCONNECT — завершение соединения.

Тип пакета указывается первым байтом в начале сообщения. По нему брокер определяет, с каким сообщением работать. В версиях MQTT 3.3.1, MQTT 5.0 эти типы пакетов могут отличаться. Нужно смотреть конкретный тип пакетов для конкретной спецификации и устройств.

QoS

QoS — важный параметр  гарантии доставки сообщения при обмене сообщениями между умными устройствами и сервером.

Виды QoS:

  1. QoS 0 — не гарантирует доставку сообщения до подписчика. Это означает, что издатель отправляет сообщение только один раз и не дожидается подтверждения, что сообщение принято.

  2. QoS 1 — более надёжный способ доставки. QoS 1 гарантирует, что сообщение будет доставлено подписчику минимум один раз. Но не гарантирует, что сообщение не придёт повторно.

  3. QoS 2 — самый надёжный способ доставки. QoS 2 гарантирует, что издатель, отправляющий сообщение, отправит его только один раз, без дублирования.

Темы (топики)

При обмене сообщениями используются топики — маршруты, которые служат для организации и фильтрации данных при передаче сообщения.

home/temperature

home/humidity

office/temperature

Топики похожи на маршруты из HTTP-протокола, но у них нет query параметров для фильтрации.

Есть некоторые особенности топиков:

  • Состоят из сегментов, разделенных слешами: /rooms/{sensor}/action/{action}

Например, если мы хотим выбрать определённые показания с определённых датчиков в комнатах, то можем применить подстановочные символы.

  • Имеют подстановочные символы:

Одноуровневый (+) - /rooms/+/action/get

Одноуровневый подстановочный символ означает, что мы будем собирать информацию только на одном уровне. Например, нам нужно собрать данные со всех датчиков температур во всех комнатах, не используя информацию о других устройствах.

Многоуровневый (#) - /rooms/#

Многоуровневый символ используется, когда нужно собрать информацию со всех устройств и датчиков во всех комнатах.

Также в брокере есть системные топики, которые обозначаются $SYS. Они предназначены для диагностики самого брокера:

  • $SYS/broker/uptime — время работы брокера.

  • $SYS/broker/clients/connected — количество подключённых клиентов.

  • $SYS/broker/clients/disconnected — количество отключённых клиентов.

  • $SYS/broker/clients/total — общее количество клиентов.

  • $SYS/broker/load — текущая нагрузка на брокер.

  • $SYS/broker/messages/sent — количество отправленных сообщений.

  • $SYS/broker/messages/stored — количество сообщений в брокере.

С их помощью мы можем отслеживать нагрузку, количество подключённых клиентов к брокеру, время его работы и т.д. Эта информация может понадобиться при диагностике проблем и при составлении метрик и графиков.

Важно, что системные топики никак не относятся к умным устройствам, а только к брокерам сообщений. По ним невозможно получить какую-то информацию от самих устройств.

Издатели

Издатель — это публикатор, который отправляет сообщение по определённому топику в брокер.

Подписчики

Подписчики вычитывают эти сообщения по определённому топику.

Например, умный датчик температуры отправляет свои данные в брокер; затем подписчики, которые отслеживают соответствующий топик, вычитывают эти данные. В результате мы или отобразим температуру конечному пользователю, или сохраним данные и сделаем выводы, построим графики.

На изображении ниже представлено как происходит обмен с MQTT брокером.

Как PHP взаимодействует с MQTT

Рассмотрим, как PHP может взаимодействовать с MQTT.

Многие слышали, что PHP не способен общаться с умными устройствами. Но зато это умеет MQTT брокер, который будет принимать и координировать сообщения. А PHP поможет нам общаться с MQTT.

Простейший воркер на PHP выглядит так:

//...
$sock = fsockopen($broker, $port, $errno, $errstr, 10);

//...
while (true) {
    $response = fread($sock, 2048);

    if (ord($response[0]) >> 4 === 3) {
        $remainingLength = ord($response[1]);
        $topicLength = ord($response[2]) << 8 | ord($response[3]);

        $msgTopic = substr($response, 4, $topicLength);
        $message = substr($response, 4 + $topicLength, $remainingLength - $topicLength - 2);
        echo "Получено сообщение в топике '$msgTopic': $message" . PHP_EOL;
    }

usleep (500000);
}

По сути, это долгоживущий цикл, который запускается в процессе и читает информацию из TCP-сокета. Далее эта информация разбирается на пакеты сообщений, само сообщение и топик. А мы можем работать с этой информацией.

Библиотеки для работы с MQTT

Для общения с MQTT есть специальные библиотеки:

https://github.com/php-mqtt/client — легковесная, легко настраивается и поддерживается. Буду показывать примеры дальше именно с ней. Имеет одну особенность — не поддерживает версию MQTT 5.0. Если вы столкнётесь с этим протоколом, учтите это.

https://github.com/simps/mqtt — построена на библиотеке Swoole.

https://github.com/aws/aws-sdk-php/tree/master/src/Iot — MQTT клиента предоставляет библиотека AWS в своем SDK.

Как вычитывать сообщения от умного устройства

Чтобы вычитывать сообщения, создадим клиента и подключимся к нему. На этом клиенте вызовем метод subscribe. Укажем первым параметром, с какого топика хотим принимать сообщение. В callback примем сам топик, сообщение и обработаем его так, как нужно — или сохраним в базу любую логику обработки, или покажем его клиенту. Третьим параметром в подписке укажем QoS. Это нужно делать обязательно, чтобы библиотека при отправке сообщения понимала, какой тип пакета отправить брокеру для правильного взаимодействия. Дальше запускается долгоживущий цикл в виде loop, и подписка будет действовать, пока мы её не прервём.

Если же мы прервали подписку или возникло исключение, то отключимся от этого клиента, чтобы не создавать лишних подключений, и прервём долгоживущий процесс.

try {
    $client = new MqttClient(MQTT_BROCKER_HOST, MQTT_BROCKER_PORT);
    $client->connect();

    $client->subscribe('rooms/+/temp', function (string $topic, string $message) {
        // Логика обработки сообщения
    }, MqttClient: :Q0S_AT_LEAST_ONCE);

    $client->loop();
    $client->disconnect();

} catch (MqttClientException) {
    // Логика обработки сообщения
}

Управлять устройствами можно и в случае, если нужно послать к ним команду. Чтобы послать устройству команду, нужно опубликовать сообщение, которое оно принимает по определённому топику, в топик, на который это устройство подписано. Также создадим клиент и подключимся к нему.

Простой пример по аренде самоката:

$client = new MqttClient(MQTT_BROKER_HOST, MQTT_BROKER_PORT);
$client->connect();

$client->publish(
    topic: 'scooter/1234/rent',
    message: json_encode(['rent_number' => '4321']),
    qualityOfService: MqttClient: :QOS_AT_MOST_ONCE,
);

$client->disconnect();

В топике — сегмент «Самокат», его номер и команда rent. В теле мы передаём номер аренды, чтобы самокат понимал, по какой аренде его включили. Также передаём QOS третьим параметром. Здесь мы не запускаем никаких долгоживущих циклов, а просто публикуем в топик сообщение и отключаемся от клиента.

Иногда может потребоваться опубликовать сообщение в топик. В этот момент нужно получить ответ от устройства. Далее сможем завершить долгоживущий процесс, чтобы не держать постоянные подписки.

Sclient = new NqttClient(MQTT_BROKER HOST, MQTT_BROKER_PORT);

$client->registerLoopEventHandler(function (MqttClient $client, float $elapsedTime) {
    if ($elapsedTime >= 30) {
        $client->interrupt();
    }
});

$client->subscribe('/device/1111/update’, function (string $topic, string $message) use ($client) {
    if (substr($message, 1, 2) === 'QG") {
        // Тут логика обработки сообщения
        $client->interrupt();
    }
});

// Публикация сообщения на которой ожидаем ответ
$client->publish('/device/1111/get', json_encode(['data’ => *test']));

$client->loop();
$client->disconnect();

Создаём клиента, подписываемся на определённый топик. Кстати, эта логика из настоящего устройства, у которого тип сообщения устройства приходил в теле самого сообщения. Мы понимаем тип этого сообщения и на этом основании делаем логику. После обработки прерываем сообщение и выходим из долгоживущего цикла, который запускается чуть ниже.

Далее опубликуем сообщение. Важный момент, что публикация произойдёт после того, как мы подписались. Бывает так, что мы опубликовали сообщение, оно успело прийти в брокер. Устройство это сообщение успевает вычитать, дать ответ, но подключения так и не случилось. В этот момент мы теряем сообщение и не знаем, что произошло в момент публикации в устройство. В коде, регистрируемом обработчиками событий сразу после создания клиента, можно производить всё те же самые действия с подписками и публикацией. В примере выше я показал, как определить тайм-аут ответа сообщения от самого умного устройства.

Например, потерялась связь Wi-Fi или GSM. Если вы реализовали на своём сайте или в мобильном приложении запрос по кнопке, то должны в какой-то момент прервать цикл, чтобы надолго его не подвешивать. У нас на это 30 секунд. Обычно устройства отвечают где-то в районе 2-3 секунд, что не критично. Если же тайм-аут намного больше, то реализуется система очередей по обработке этих сообщений.

Важно:

  • Не допускать подключения нескольких клиентов с одним и тем же ID, потому что MQTT-брокер будет выдавать ошибку, что ID не уникален.

  • Своевременно закрывать соединение, чтобы не создавать нагрузку на брокер впустую.

  • Не забывать про долгоживущие процессы и не допускать от них утечек памяти при создании в воркере подписки на определённый топик.

Реализация логера

Реализация простого логера выглядит так:

$client = new MqttClient(MQTT_BROKER_HOST, MQTT_BROKER_PORT);

$client->subscribe('#', function (string $topic, string $message) {
    $this->logger->info('Сообщение от устройства.', [
        'topic' => $topic,
        'message' => $message,
    ]);
});

$client->loop();
$client->disconnect();

Подписываемся на все топики, присылаемые устройствами. Это означает, что мы хотим посмотреть хронологию с временными метками присылаемых в брокер событий. Например, чтобы диагностировать, почему сообщение было не доставлено. Логируется сам топик и само сообщение. Также запускается долгоживущий процесс в виде loop. Если этот процесс прерывается, мы отключаемся от этого клиента.

Лог с реального IoT-устройства выглядит так:

В Москве, Казани и Питере есть сервис шеринга зарядных устройств PowerApp. Для станций, хранящих пауэрбанки, я с нуля писал программное обеспечение по общению с ними. Это пример с одной из станций.

Здесь update означает, что сообщение пришло от устройства. Get — что сообщение послал сервер. Командой CN включается станция. Мы отвечаем, что приняли сообщение, и дальше устройство присылает свою внутреннюю информацию. В устройстве содержатся слоты, где стоят пауэрбанки, и нам нужно понимать, какой пауэрбанк сейчас вставлен, какой у него уровень заряда, видеть коды ошибок пауэрбанков. На основании этой информации мы можем принять решение о дальнейшей сдаче пауэрбанка в аренду.

Устройство присылает сообщение в виде сердцебиения. Это происходит в зависимости от настроек, раз в минуту или три. Сердцебиение показывает, как заряжаются пауэрбанки, их текущий заряд, а также информацию о том, что устройство находится в сети.

Сообщение в виде Protobuf — тоже с реального устройства по типу станции «Бери заряд», где обмен идёт как раз в этом виде:

syntax = "proto3";

package messages. setUpVoice;

message ServerSend {
    uint32 rl_index = 1;
    uint32 rl_ivl = 2;
    uint32 rl_seq = 3;
}

message CabinetReply {
    uint32 rl_result = 1;
    uint32 rl_code = 2;
    uint32 rl_seq = 3;
}

На примере выше показана установка громкости в самом устройстве: сообщение отправляет в устройство значение уровня громкости. Затем от устройства приходит сообщение с кодом ответа: успешно или нет. Всё прекрасно работает и  обрабатывается.

Проблемы физических устройств

Проблемы бывают всегда. Большинство техники выпускается на китайском рынке. При общении напрямую с китайскими представителями часто возникают сложности перевода. Мы не всегда понимаем друг друга. Приходится подстраиваться.

Ряд критичных проблем устройств, с которыми я столкнулся:

  • Нет гарантии ответа на запрос. Устройство в сети приняло сообщение, но не отправляет в ответ ничего. Нам нужно учитывать это в коде и в таком случае перезапрашивать дополнительную информацию.

  • Обрывается сообщение, которое необходимо обработать. Мы отправляем команду, или устройство присылает сообщение в оборванном виде (полстроки, четверть строки). Возможно, это связано с ограничением физических устройств, но такое бывает. 

  • Потеря сети GSM / WI-FI. Если станция или любое устройство не в сети, нам нужно учитывать это при отправке запросов к ним.

  • Перезагрузка устройств во время запроса. Это проявляется, например, при аренде пауэрбанков в станциях. Мы отправляем запрос на аренду, пользователь подходит, сканирует QR-код, банк выезжает и он его использует, а станция в этот момент присылает запрос о включении, как будто она только что вышла в сеть. Китайцы написали: «Пришлите логи с настоящей станции». Это значило, что нам нужно было ехать в ресторан, подключаться, отловить этот момент и снять со станции все логи, чтоб потом отправить им. Нам это не подходило, поэтому решали с помощью логики в коде.

  • Не все устройства могут принять более одного сообщения за раз. Видимо, связано это с тем, что устройства ограничены в технической реализации. Например, подходят к зарядному устройству два человека, сканируют QR-код, и в устройство летит две команды. Оно путает эти команды или вообще не обрабатывает, или обрабатывает не то. С этим надо уметь работать. Например, учитывать, что не нужно посылать больше одной команды, пока устройство не ответило.

MQTT как способ общения в PHP

PHP может обмениваться сообщениями и без умных устройств, выступая как издателем, так и подписчиком. Это добавляет к выбору ещё один способ обмена сообщениями между микросервисами или устройствами.

Но вам потребуется дополнительный брокер, через который вы и будете обмениваться. Есть ряд нюансов: сообщения ограничены в query параметрах и хедерах, поэтому будет неудобно общаться между сервисами. Но есть и плюс — сообщения сжимаются в бинарный вид и очень быстро передаются.

Масштабирование

Чтобы разобрать все сообщения, приходящие от устройств, можно использовать очереди. Сейчас в России более тысячи станций, выдающих пауэрбанки.

Каждая станция присылает сообщения при сердцебиении. Также к ней отправляются команды, а она присылает сообщения в ответ. Получается до 3-3.5 тысяч сообщений в секунду. Даже если логика обработки этих сообщений очень большая, мы можем спокойно их все разобрать с помощью Rabbit.

Горизонтальное масштабирование может быть каким угодно, с помощью Docker и Kubernetes.

Всё прекрасно масштабируется. Нет никаких практических ограничений — масштабируются PHP-воркеры, PHP-приложения, сами брокеры MQTT.

Преимущества PHP в IoT

Резюмирую преимущества PHP при общении с умными устройствами:

  • Снижение затрат на разработку. Мы не привлекаем другие команды, не тратим деньги на создание дополнительного сервиса, делаем всё сами, на родном языке.

  • Быстрое управление умными устройствами через приложения. Через любое приложение (веб, мобильное) отправляем команду, через бэкэнд посылаем запрос к умному устройству. После ответа устройства можем сразу обновить информацию по действию в приложении.

  • Интеграция с любыми видами сообщений. Благо, язык позволяет обрабатывать любые сообщения, которые присылают умные устройства.

  • Простота масштабирования за счёт стандартных средств.

Комментарии (6)