Как я и говорил в прошлой части, тут мы попробуем сделать самый простой обмен при помощи MQTT.
Что же из себя представляет MQTT? В первую очередь - это протокол обмена сообщениями. Данные сообщения могут группироваться по древовидному признаку. Мы можем как отправлять их, так и получать, подписываясь на определенные группы в дереве или отдельные сообщения.
Добавляем в репозитории контроллера системы управления пакетами сервера:
deb http://ppa.launchpad.net/mosquitto-dev/mosquitto-ppa/ubuntu precise main
deb-src http://ppa.launchpad.net/mosquitto-dev/mosquitto-ppa/ubuntu precise main
Обновляем список пакетов и устанавливаем брокер MQTT Mosquitto на контроллере:
apt-get update
apt-get install mosquitto
На системе для разработки устанавливаем файлы для сборки и исходники libmosquitto-dev
:
sudo apt install libmosquitto-dev
apt source libmosquitto-dev
Переходим в созданную папку с исходниками и собираем библиотеку под компилятор для arm-linux-gnueabihf
:
make WITH_TLS=no WITH_CJSON=no WITH_STATIC_LIBRARIES=yes
В папке ./lib/cpp/
вы найдете все собранные библиотеки чтобы подложить в проект. Нам потребуется файл libmosquitto.a
.
Давайте разберем, что же мы хотим получить не выходе? Разложим все по пунктам:
нам надо передавать адреса переменных;
будем прикреплять их за топиками;
можем подписываться на внешние сообщения, а можем и нет;
периодически переменные должны обновляться на брокере.
Опишем структуру хранения самой переменной для контроля изменения переменной, времени изменения и точности, до которой надо контролировать числа с плавающей запятой:
template<typename T>
struct Value {
Value(string topic,T old, T* current,int precision):
topic{topic},old{old},current{current},precision{precision} { }
~Value() { }
string topic;
T old;
T* current;
uint64_t tFlag=timeSinceEpochMillisec();
int precision=2;
};
Функция timeSinceEpochMillisec()
была описана в предыдущей статье.
Мы будем разрабатывать класс, который необходимо наследовать от mosquittopp. Задаем Id для присоединения к брокеру на контроллере, хост, который у нас будет в нашем случае localhost, а также начальный топик device.
class MqttDevice:public mosquittopp::mosquittopp
{
public:
MqttDevice(string Id,string mqtt_host,string startTag="device");
~MqttDevice();
Очередь сообщений будем хранить списком:
list< tuple<string,string> > messages;
Для добавления переменных в классе организуем необходимые функции. Мы передаем топик и переменную.
void addSubscribed(string topic, volatile bool* value);
void addSubscribed(string topic, volatile float* value);
void addSubscribed(string topic, volatile uint16_t* value);
void add(string topic,volatile bool* value);
void add(string topic,volatile uint16_t* value);
void add(string topic,volatile float* value,int precision=1);
void add(string topic,volatile double* value,int precision=1);
Получим вот такой полный файл MqttDevice.h
:
class MqttDevice:public mosquittopp::mosquittopp {
public:
MqttDevice(string Id,string mqtt_host,string startTag="device");
~MqttDevice();
void setTag(string topic,string value);
void message(string name, volatile double value);
void message(string name, volatile float value);
void message(string name, volatile bool value);
void message(string name, string value);
list< tuple<string,string> > messages;
void on_connect(int rc);
void on_message(const struct mosquitto_message* message);
void on_subscribe(int mid,int qos_count,const int* granted_qos);
void sendMessage(string topic, string& message);
void sendMessage(string& topic,char* message, int msgLength);
void addSubscribed(string topic, volatile bool* value);
void addSubscribed(string topic, volatile float* value);
void add(string topic,volatile bool* value);
void add(string topic,volatile float* value,int precision=1);
void add(string topic,volatile double* value,int precision=1);
protected:
uint64_t tBool=0,tFloat=0,tDouble=0;
bool _exit;
private:
string mqtt_host;
list<Value<volatile bool> > lwBool;
list<Value<volatile float> > lwFloat;
list<Value<volatile bool> > lrBool;
list<Value<volatile float> > lrFloat;
list<Value<volatile double> > lrDouble;
struct mosquitto *mosq;
string startTag;
void mosq_loop();
thread _threadMosquitto;
};
В конструкторе и деструкторе мы организуем основные работы по созданию потока и его остановке.
MqttDevice::MqttDevice(string Id,string mqtt_host,string startTag) : startTag{startTag},mqtt_host{mqtt_host}, mosquittopp((Id+to_string(rand())).c_str()) {
_exit=false;
int rc=connect(mqtt_host.c_str(),mqtt_port,65535);
printf("MqttDevice connect %d\n",rc);
_threadMosquitto = thread(&MqttDevice::mosq_loop,this);
}
MqttDevice::~MqttDevice() {
_exit=true;
_threadMosquitto.join();
printf("MqttDevice delete\n");
}
Полный текст MqttDevice.cpp:
#define mqtt_port 1883
MqttDevice::MqttDevice(string Id,string mqtt_host,string startTag) : startTag{startTag},mqtt_host{mqtt_host}, mosquittopp((Id+to_string(rand())).c_str()) {
_exit=false;
int rc=connect(mqtt_host.c_str(),mqtt_port,65535);
printf("MqttDevice connect %d\n",rc);
_threadMosquitto = thread(&MqttDevice::mosq_loop,this);
}
MqttDevice::~MqttDevice() {
_exit=true;
_threadMosquitto.join();
printf("MqttDevice delete\n");
}
void MqttDevice::addSubscribed(string topic, volatile bool* value) {
lwBool.push_back({topic,false,value,0});
if(subscribe(0,topic.c_str(),0)==MOSQ_ERR_SUCCESS) printf("[Mosquitto] Subscribing to topic %s\n",topic.c_str());
}
void MqttDevice::addSubscribed(string topic, volatile float* value) {
lwFloat.push_back({topic,0.0f,value,1});
if(subscribe(0,topic.c_str(),0)==MOSQ_ERR_SUCCESS) printf("[Mosquitto] Subscribing to topic %s\n",topic.c_str());
}
void MqttDevice::add(string topic,volatile bool* value) {
lrBool.push_back({topic,false,value,0});
}
void MqttDevice::add(string topic,volatile float* value, int precision) {
lrFloat.push_back({topic,0.0f,value,precision});
}
void MqttDevice::add(string topic,volatile double* value, int precision) {
lrDouble.push_back({topic,0.0,value,precision});
}
void MqttDevice::on_connect(int rc) {
int err=0;
printf("[Mosquitto] Connect. rc=%d\n",rc);
if(rc==0) {
for(auto& v:lwBool) {
err=subscribe(0,v.topic.c_str(),0);
if(err==MOSQ_ERR_SUCCESS) printf("[Mosquitto] Subscribing to topic %s\n",v.topic.c_str());
else {reconnect();return;}
nsleep(10);
}
for(auto& v:lwFloat) {
err=subscribe(0,v.topic.c_str(),0);
if(err==MOSQ_ERR_SUCCESS) printf("[Mosquitto] Subscribing to topic %s\n",v.topic.c_str());
else {reconnect();return;}
nsleep(10);
}
}
else
printf("[Mosquitto] Connection failed. Aborting subscribing.\n");
}
void MqttDevice::on_message(const struct mosquitto_message *message) {
string tag=message->topic;
string value=string((const char*)message->payload,message->payloadlen);
setTag(tag,value);
}
void MqttDevice::mosq_loop() {
int rc;
lib_init();
printf("[Mosquitto] Start loop...\n");
while(!_exit) {
uint64_t now = timeSinceEpochMillisec();
rc=loop();
if(rc) {
printf("[Mosquitto] Disconnected. Trying to reconnect...\n");
nsleep(500);
reconnect();
nsleep(500);
continue;
}
while(messages.size()>0) {
rc=publish(NULL,get<0>(messages.front()).c_str(),strlen(get<1>(messages.front()).c_str()),(const uint8_t*)get<1>(messages.front()).c_str(),0,true);
if(rc) {
printf("[Mosquitto] Error send message\n");
break;
}
else
messages.pop_front();
}
if(now-tBool>300) {
tBool=now;
for(auto &v:lrBool) {
if(v.current==NULL) continue;
if(v.old!=*v.current || now-v.tFlag>3000) {
v.tFlag=now;
v.old=*v.current;
message(v.topic,*v.current);
}
}
for(auto &v:lwBool) {
if(v.old!=*v.current || now-v.tFlag>3000) {
v.tFlag=now;
v.old=*v.current;
message(v.topic,*v.current);
}
}
}
if(now-tFloat>500) {
tFloat=now;
for(auto &v:lrFloat) {
float r=round(*v.current*pow(10,v.precision))/pow(10,v.precision);
if(v.old!=r || now-v.tFlag>5000) {
v.tFlag=now;
v.old=r;
message(v.topic,r);
}
}
for(auto &v:lrDouble) {
double r=round(*v.current*pow(10,v.precision))/pow(10,v.precision);
if(v.old!=r || now-v.tFlag>5000) {
v.tFlag=now;
v.old=r;
message(v.topic,r);
}
}
for(auto &v:lwFloat) {
float r=round(*v.current*pow(10,v.precision))/pow(10,v.precision);
if(v.old!=r || now-v.tFlag>5000) {
v.tFlag=now;
v.old=r;
message(v.topic,r);
}
}
}
}
lib_cleanup();
printf("[Mosquitto] Exit from mosquitto loop.\n");
}
void MqttDevice::message(string name, volatile float value) {
messages.push_back(make_tuple(name,to_string(value)));
}
void MqttDevice::message(string name, volatile double value) {
messages.push_back(make_tuple(name,to_string(value)));
}
void MqttDevice::message(string name, volatile bool value) {
messages.push_back(make_tuple(name,(value?"true":"false")));
}
void MqttDevice::message(string name, string value) {
messages.push_back(make_tuple(name,value));
}
void MqttDevice::setTag(string topic,string value) {
try {
for(auto& v:lwBool)
if(v.topic==topic) {
*(v.current)=(value=="true");
v.old=*(v.current);
return;
}
for(auto& v:lwFloat)
if(v.topic==topic) {
*(v.current)=(float)atof(value.c_str());
v.old=*(v.current);
return;
}
}
catch(exception& ex) { }
}
Для передачи более сложных объектов автоматизации, мы можем реализовать функции:
void MqttDevice::add(ValveD* valve) {
add(startTag+"/"+valve->name+"/SQL",valve->b_t1);
add(startTag+"/"+valve->name+"/SQH",valve->b_t2);
add(startTag+"/"+valve->name+"/out",valve->b_out);
addSubscribed(startTag+"/"+valve->name+"/auto",valve->b_mode);
addSubscribed(startTag+"/"+valve->name+"/man",valve->b_control);
}
В данном примере передается указатель на объект отсечного клапана с двумя концевиками крайних положений, выходного сигнала, а также управления клапаном. Из управления есть сигнал в каком состоянии клапан auto. Если ручной режим auto=false, то управление с сигнала man. Если автоматический режим auto=true, то управление с внутреннего состояния, заданного программно по алгоритму.
Из прошлой статьи возьмем пример опроса модуля ввода/вывода и к нему добавим отправку полученных данных на брокер:
volatile bool b1=false,b2=false,b3=false;
ModbusLine line1(modbus_new_rtu("/dev/ttyS2", 115200, 'N', 8, 1));
line1.addMDS_DIO_16BD(1,&b1,&b2,&b3);
MqttDevice mqtt("line","localhost");
mqtt.addSubscribed("device/b1",&b1);
mqtt.add("device/b2",&b2);
mqtt.add("device/b3",&b3);
При подключении к брокеру мы получили 3 переменных. При изменении переменной b1 через брокер в программе получаем изменение в значении переменной.
Мы добились нужного нам результата, а добавление новых переменных не занимает много кода.
В следующей статье мы опробуем обмен между контроллерами и устройствами через RS-485 по Modbus аналогичным образом. Это будет полезно при межконтроллерном обмене и общении с сенсорными панелями.
Комментарии (3)
eao197
28.09.2023 11:39+1catch(exception ex) { }
Это вы std::exception и его наследников собрались ловить по значению?
maledog
А catch(exception ex) { } не приведет к тому, что программа тихо схлопнется при публикации невалидных данных? mqtt без авторизации и acl - это же проходной двор.
Squarc Автор
Программа по тестам как раз перестала валиться. Сеть АСУ ТП изолирована :)